Basic remote building

This removes the need for Nix's build-remote.pl.

Build logs are now written to $HYDRA_DATA/build-logs because
hydra-queue-runner doesn't have write permission to /nix/var/log.
This commit is contained in:
Eelco Dolstra 2015-06-09 14:21:21 +02:00
parent 3a6cb2f270
commit 8b12ac1f6d
5 changed files with 445 additions and 53 deletions

View file

@ -1,6 +1,6 @@
bin_PROGRAMS = hydra-queue-runner
hydra_queue_runner_SOURCES = hydra-queue-runner.cc build-result.cc
hydra_queue_runner_SOURCES = hydra-queue-runner.cc build-result.cc build-remote.cc
hydra_queue_runner_LDADD = $(NIX_LIBS) -lpqxx
AM_CXXFLAGS = $(NIX_CFLAGS) -Wall

View file

@ -0,0 +1,170 @@
#include <algorithm>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "build-remote.hh"
#include "util.hh"
#include "misc.hh"
#include "serve-protocol.hh"
#include "worker-protocol.hh"
using namespace nix;
struct Child
{
Pid pid;
AutoCloseFD to, from;
};
static void openConnection(const string & sshName, const string & sshKey,
int stderrFD, Child & child)
{
Pipe to, from;
to.create();
from.create();
child.pid = startProcess([&]() {
if (dup2(to.readSide, STDIN_FILENO) == -1)
throw SysError("cannot dup input pipe to stdin");
if (dup2(from.writeSide, STDOUT_FILENO) == -1)
throw SysError("cannot dup output pipe to stdout");
if (dup2(stderrFD, STDERR_FILENO) == -1)
throw SysError("cannot dup stderr");
Strings argv({"ssh", "-x", "-a", sshName, "--", "nix-store", "--serve", "--write"});
execvp("ssh", (char * *) stringsToCharPtrs(argv).data()); // FIXME: remove cast
throw SysError("cannot start ssh");
});
to.readSide.close();
from.writeSide.close();
child.to = to.writeSide.borrow();
child.from = from.readSide.borrow();
}
static void copyClosureTo(std::shared_ptr<StoreAPI> store,
FdSource & from, FdSink & to, const PathSet & paths,
bool useSubstitutes = false)
{
PathSet closure;
for (auto & path : paths)
computeFSClosure(*store, path, closure);
Paths sorted = topoSortPaths(*store, closure);
/* Send the "query valid paths" command with the "lock" option
enabled. This prevents a race where the remote host
garbage-collect paths that are already there. Optionally, ask
the remote host to substitute missing paths. */
writeInt(cmdQueryValidPaths, to);
writeInt(1, to); // == lock paths
writeInt(useSubstitutes, to);
writeStrings(sorted, to);
to.flush();
/* Get back the set of paths that are already valid on the remote
host. */
auto present = readStorePaths<PathSet>(from);
PathSet missing;
std::set_difference(closure.begin(), closure.end(), present.begin(), present.end(),
std::inserter(missing, missing.end()));
printMsg(lvlError, format("sending %1% missing paths") % missing.size());
if (missing.empty()) return;
throw Error("NOT IMPL 1");
}
static void copyClosureFrom(std::shared_ptr<StoreAPI> store,
FdSource & from, FdSink & to, const PathSet & paths)
{
writeInt(cmdExportPaths, to);
writeInt(0, to); // == don't sign
writeStrings(paths, to);
to.flush();
store->importPaths(false, from);
}
void buildRemote(std::shared_ptr<StoreAPI> store,
const string & sshName, const string & sshKey,
const Path & drvPath, const Derivation & drv,
const nix::Path & logDir, RemoteResult & result)
{
string base = baseNameOf(drvPath);
Path logFile = logDir + "/" + string(base, 0, 2) + "/" + string(base, 2);
createDirs(dirOf(logFile));
AutoCloseFD logFD(open(logFile.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0666));
if (logFD == -1) throw SysError(format("creating log file %1%") % logFile);
Child child;
openConnection(sshName, sshKey, logFD, child);
logFD.close();
FdSource from(child.from);
FdSink to(child.to);
/* Handshake. */
writeInt(SERVE_MAGIC_1, to);
writeInt(SERVE_PROTOCOL_VERSION, to);
to.flush();
unsigned int magic = readInt(from);
if (magic != SERVE_MAGIC_2)
throw Error(format("protocol mismatch with nix-store --serve on %1%") % sshName);
unsigned int version = readInt(from);
if (GET_PROTOCOL_MAJOR(version) != 0x200)
throw Error(format("unsupported nix-store --serve protocol version on %1%") % sshName);
/* Copy the input closure. */
printMsg(lvlError, format("sending closure of %1% to %2%") % drvPath % sshName);
copyClosureTo(store, from, to, PathSet({drvPath}));
/* Do the build. */
printMsg(lvlError, format("building %1% on %2%") % drvPath % sshName);
writeInt(cmdBuildPaths, to);
writeStrings(PathSet({drvPath}), to);
writeInt(3600, to); // == maxSilentTime, FIXME
writeInt(7200, to); // == buildTimeout, FIXME
to.flush();
result.startTime = time(0);
int res = readInt(from);
result.stopTime = time(0);
if (res) {
result.errorMsg = (format("%1% on %2%") % readString(from) % sshName).str();
if (res == 100) result.status = RemoteResult::rrPermanentFailure;
else if (res == 101) result.status = RemoteResult::rrTimedOut;
else result.status = RemoteResult::rrMiscFailure;
return;
}
/* Copy the output paths. */
printMsg(lvlError, format("copying outputs of %1% from %2%") % drvPath % sshName);
PathSet outputs;
for (auto & output : drv.outputs)
outputs.insert(output.second.path);
copyClosureFrom(store, from, to, outputs);
/* Shut down the connection. */
child.to.close();
child.pid.wait(true);
result.status = RemoteResult::rrSuccess;
}

View file

@ -0,0 +1,21 @@
#pragma once
#include "store-api.hh"
#include "derivations.hh"
struct RemoteResult
{
enum {
rrSuccess = 0,
rrPermanentFailure = 1,
rrTimedOut = 2,
rrMiscFailure = 3
} status = rrMiscFailure;
std::string errorMsg;
time_t startTime = 0, stopTime = 0;
};
void buildRemote(std::shared_ptr<nix::StoreAPI> store,
const std::string & sshName, const std::string & sshKey,
const nix::Path & drvPath, const nix::Derivation & drv,
const nix::Path & logDir, RemoteResult & result);

View file

@ -8,6 +8,7 @@
#include <pqxx/pqxx>
#include "build-result.hh"
#include "build-remote.hh"
#include "sync.hh"
#include "pool.hh"
@ -134,6 +135,10 @@ struct Step
Sync<State> state;
std::atomic_bool destroyed;
Step() : destroyed(false) { }
~Step()
{
printMsg(lvlError, format("destroying step %1%") % drvPath);
@ -141,11 +146,49 @@ struct Step
};
struct Machine
{
typedef std::shared_ptr<Machine> ptr;
std::string sshName, sshKey;
std::set<std::string> systemTypes, supportedFeatures, mandatoryFeatures;
unsigned int maxJobs = 1;
float speedFactor = 1.0;
Sync<unsigned int> currentJobs;
Machine()
{
auto currentJobs_(currentJobs.lock());
*currentJobs_ = 0;
}
};
/* A RAII helper that manages the currentJobs field of Machine
objects. */
struct MachineReservation
{
typedef std::shared_ptr<MachineReservation> ptr;
Machine::ptr machine;
MachineReservation(Machine::ptr machine) : machine(machine)
{
auto currentJobs_(machine->currentJobs.lock());
(*currentJobs_)++;
}
~MachineReservation()
{
auto currentJobs_(machine->currentJobs.lock());
if (*currentJobs_ > 0) (*currentJobs_)--;
}
};
class State
{
private:
std::thread queueMonitorThread;
Path hydraData, logDir;
/* CV for waking up the queue. */
std::condition_variable queueMonitorWakeup;
@ -168,20 +211,35 @@ private:
std::condition_variable_any runnableWakeup;
/* CV for waking up the dispatcher. */
std::condition_variable dispatcherWakeup;
std::mutex dispatcherMutex;
/* PostgreSQL connection pool. */
Pool<Connection> dbPool;
/* The build machines. */
typedef std::list<Machine::ptr> Machines;
Sync<Machines> machines;
/* The currently active builder threads. FIXME: We could re-use
these, but since they're fairly long-running, it's probably not
worth it. */
// std::vector<std::thread> builderThreads;
public:
State();
~State();
void loadMachines();
void markActiveBuildStepsAsAborted(time_t stopTime);
int createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step,
BuildStepStatus status, const std::string & errorMsg = "", BuildID propagatedFrom = 0);
void finishBuildStep(pqxx::work & txn, time_t stopTime, BuildID buildId, int stepNr,
void finishBuildStep(pqxx::work & txn, time_t startTime, time_t stopTime, BuildID buildId, int stepNr,
BuildStepStatus status, const string & errorMsg = "", BuildID propagatedFrom = 0);
void updateBuild(pqxx::work & txn, Build::ptr build, BuildStatus status);
@ -200,9 +258,17 @@ public:
void makeRunnable(Step::ptr step);
void builderThreadEntry(int slot);
/* The thread that selects and starts runnable builds. */
void dispatcher();
void doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step);
void wakeDispatcher();
MachineReservation::ptr findMachine(Step::ptr step);
void builder(Step::ptr step, MachineReservation::ptr reservation);
void doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,
Machine::ptr machine);
void markSucceededBuild(pqxx::work & txn, Build::ptr build,
const BuildResult & res, bool isCachedBuild, time_t startTime, time_t stopTime);
@ -213,6 +279,10 @@ public:
State::State()
{
hydraData = getEnv("HYDRA_DATA");
if (hydraData == "") throw Error("$HYDRA_DATA must be set");
logDir = canonPath(hydraData + "/build-logs");
}
@ -227,6 +297,49 @@ State::~State()
}
void State::loadMachines()
{
Path machinesFile = getEnv("NIX_REMOTE_SYSTEMS", "/etc/nix/machines");
Machines newMachines;
if (pathExists(machinesFile)) {
for (auto line : tokenizeString<Strings>(readFile(machinesFile), "\n")) {
line = trim(string(line, 0, line.find('#')));
auto tokens = tokenizeString<std::vector<std::string>>(line);
if (tokens.size() < 3) continue;
tokens.resize(7);
auto machine = std::make_shared<Machine>();
machine->sshName = tokens[0];
machine->systemTypes = tokenizeString<StringSet>(tokens[1], ",");
machine->sshKey = tokens[2];
if (tokens[3] != "")
string2Int(tokens[3], machine->maxJobs);
else
machine->maxJobs = 1;
machine->speedFactor = atof(tokens[4].c_str());
machine->supportedFeatures = tokenizeString<StringSet>(tokens[5], ",");
machine->mandatoryFeatures = tokenizeString<StringSet>(tokens[6], ",");
newMachines.push_back(machine);
}
} else {
auto machine = std::make_shared<Machine>();
machine->sshName = "localhost";
machine->systemTypes = StringSet({settings.thisSystem});
if (settings.thisSystem == "x86_64-linux")
machine->systemTypes.insert("i686-linux");
machine->maxJobs = settings.maxBuildJobs;
newMachines.push_back(machine);
}
auto machines_(machines.lock());
*machines_ = newMachines;
}
void State::markActiveBuildStepsAsAborted(time_t stopTime)
{
auto conn(dbPool.get());
@ -262,15 +375,17 @@ int State::createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build,
}
void State::finishBuildStep(pqxx::work & txn, time_t stopTime, BuildID buildId, int stepNr,
void State::finishBuildStep(pqxx::work & txn, time_t startTime, time_t stopTime, BuildID buildId, int stepNr,
BuildStepStatus status, const std::string & errorMsg, BuildID propagatedFrom)
{
assert(startTime);
assert(stopTime);
txn.parameterized
("update BuildSteps set busy = 0, status = $1, propagatedFrom = $4, errorMsg = $5, stopTime = $6 where build = $2 and stepnr = $3")
("update BuildSteps set busy = 0, status = $1, propagatedFrom = $4, errorMsg = $5, startTime = $6, stopTime = $7 where build = $2 and stepnr = $3")
((int) status)(buildId)(stepNr)
(propagatedFrom, propagatedFrom != 0)
(errorMsg, errorMsg != "")
(stopTime, stopTime != 0).exec();
(startTime)(stopTime).exec();
}
@ -346,6 +461,7 @@ void State::getQueuedBuilds(std::shared_ptr<StoreAPI> store)
if (!store->isValidPath(build->drvPath)) {
/* Derivation has been GC'ed prematurely. */
printMsg(lvlInfo, format("aborting GC'ed build %1%") % build->id);
pqxx::work txn(*conn);
txn.parameterized
("update Builds set finished = 1, buildStatus = $2, startTime = $3, stopTime = $3, errorMsg = $4 where id = $1")
@ -366,6 +482,8 @@ void State::getQueuedBuilds(std::shared_ptr<StoreAPI> store)
Derivation drv = readDerivation(build->drvPath);
BuildResult res = getBuildResult(store, drv);
printMsg(lvlInfo, format("cached build %1%") % build->id);
pqxx::work txn(*conn);
time_t now = time(0);
markSucceededBuild(txn, build, res, true, now, now);
@ -385,6 +503,9 @@ void State::getQueuedBuilds(std::shared_ptr<StoreAPI> store)
build->toplevel = step;
}
printMsg(lvlInfo, format("added build %1% (top-level step %2%, %3% new runnable steps)")
% build->id % step->drvPath % newRunnable.size());
/* Prior to this, the build is not visible to
getDependentBuilds(). Now it is, so the build can be
failed if a dependency fails. (It can't succeed right away
@ -462,6 +583,9 @@ Step::ptr State::createStep(std::shared_ptr<StoreAPI> store, const Path & drvPat
void State::destroyStep(Step::ptr step, bool proceed)
{
if (step->destroyed) return;
step->destroyed = true;
printMsg(lvlInfo, format("destroying build step %1%") % step->drvPath);
{
@ -547,6 +671,8 @@ std::set<Build::ptr> State::getDependentBuilds(Step::ptr step)
void State::makeRunnable(Step::ptr step)
{
printMsg(lvlInfo, format("step %1% is now runnable") % step->drvPath);
{
auto step_(step->state.lock());
assert(step_->deps.empty());
@ -557,38 +683,109 @@ void State::makeRunnable(Step::ptr step)
runnable_->push_back(step);
}
runnableWakeup.notify_one();
wakeDispatcher();
}
void State::builderThreadEntry(int slot)
void State::dispatcher()
{
auto store = openStore(); // FIXME: pool
while (!exitRequested) {
printMsg(lvlError, "dispatcher woken up");
while (true) {
/* Sleep until a runnable build step becomes available. */
Step::ptr step;
{
auto runnable_(runnable.lock());
while (runnable_->empty() && !exitRequested)
runnable_.wait(runnableWakeup);
if (exitRequested) break;
auto weak = *runnable_->begin();
runnable_->pop_front();
step = weak.lock();
if (!step) continue;
printMsg(lvlError, format("%1% runnable builds") % runnable_->size());
/* FIXME: we're holding the runnable lock too long
here. This could be more efficient. */
for (auto i = runnable_->begin(); i != runnable_->end(); ) {
auto step = i->lock();
/* Delete dead steps. */
if (!step) {
i = runnable_->erase(i);
continue;
}
auto reservation = findMachine(step);
if (!reservation) {
printMsg(lvlError, format("cannot execute step %1% right now") % step->drvPath);
++i;
continue;
}
printMsg(lvlInfo, format("WOOHOO: starting step %1% on machine %2%")
% step->drvPath % reservation->machine->sshName);
i = runnable_->erase(i);
auto builderThread = std::thread(&State::builder, this, step, reservation);
builderThread.detach(); // FIXME?
}
}
/* Build it. */
printMsg(lvlError, format("slot %1%: got build step %2%") % slot % step->drvPath);
doBuildStep(store, step);
/* Sleep until we're woken up (either because a runnable build
is added, or because a build finishes). */
{
std::unique_lock<std::mutex> lock(dispatcherMutex);
dispatcherWakeup.wait(lock);
}
}
printMsg(lvlError, "builder thread exits");
printMsg(lvlError, "dispatcher exits");
}
void State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step)
void State::wakeDispatcher()
{
{ std::lock_guard<std::mutex> lock(dispatcherMutex); } // barrier
dispatcherWakeup.notify_all();
}
MachineReservation::ptr State::findMachine(Step::ptr step)
{
auto machines_(machines.lock());
for (auto & machine : *machines_) {
if (!has(machine->systemTypes, step->drv.platform)) continue;
// FIXME: check features
{
auto currentJobs_(machine->currentJobs.lock());
if (*currentJobs_ >= machine->maxJobs) continue;
}
return std::make_shared<MachineReservation>(machine);
}
/* FIXME: distinguish between permanent failures (a matching
machine doesn't exist) and temporary failures (a matching
machine is not available). */
return 0;
}
void State::builder(Step::ptr step, MachineReservation::ptr reservation)
{
try {
auto store = openStore(); // FIXME: pool
doBuildStep(store, step, reservation->machine);
} catch (std::exception & e) {
printMsg(lvlError, format("build thread for %1%: %2%") % step->drvPath % e.what());
// FIXME: put step back in runnable and retry
}
/* Release the machine and wake up the dispatcher. */
assert(reservation.unique());
reservation = 0;
wakeDispatcher();
printMsg(lvlError, "builder exits");
}
void State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,
Machine::ptr machine)
{
/* There can be any number of builds in the database that depend
on this derivation. Arbitrarily pick one (though preferring a
@ -625,27 +822,28 @@ void State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step)
/* Create a build step record indicating that we started
building. */
auto conn(dbPool.get());
time_t startTime = time(0);
RemoteResult result;
result.startTime = time(0);
int stepNr;
{
pqxx::work txn(*conn);
stepNr = createBuildStep(txn, startTime, build, step, bssBusy);
stepNr = createBuildStep(txn, result.startTime, build, step, bssBusy);
txn.commit();
}
bool success = false;
std::string errorMsg;
try {
store->buildPaths(PathSet({step->drvPath}));
success = true;
buildRemote(store, machine->sshName, machine->sshKey, step->drvPath, step->drv, logDir, result);
} catch (Error & e) {
errorMsg = e.msg();
result.status = RemoteResult::rrMiscFailure;
result.errorMsg = e.msg();
printMsg(lvlError, format("ERROR: %1%") % e.msg());
abort();
}
time_t stopTime = time(0);
if (!result.stopTime) result.stopTime = time(0);
BuildResult res;
if (success) res = getBuildResult(store, step->drv);
if (result.status == RemoteResult::rrSuccess) res = getBuildResult(store, step->drv);
// FIXME: handle failed-with-output
@ -676,33 +874,34 @@ void State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step)
{
pqxx::work txn(*conn);
if (success) {
if (result.status == RemoteResult::rrSuccess) {
finishBuildStep(txn, stopTime, build->id, stepNr, bssSuccess);
finishBuildStep(txn, result.startTime, result.stopTime, build->id, stepNr, bssSuccess);
/* Mark all builds of which this derivation is the top
level as succeeded. */
for (auto build2 : direct)
markSucceededBuild(txn, build2, res, false, startTime, stopTime);
markSucceededBuild(txn, build2, res, false, result.startTime, result.stopTime);
} else {
/* Create failed build steps for every build that depends
on this. */
finishBuildStep(txn, stopTime, build->id, stepNr, bssFailed, errorMsg);
finishBuildStep(txn, result.startTime, result.stopTime, build->id, stepNr, bssFailed, result.errorMsg);
for (auto build2 : dependents) {
if (build == build2) continue;
createBuildStep(txn, stopTime, build2, step, bssFailed, errorMsg, build->id);
createBuildStep(txn, result.stopTime, build2, step, bssFailed, result.errorMsg, build->id);
}
/* Mark all builds that depend on this derivation as failed. */
for (auto build2 : dependents) {
printMsg(lvlError, format("marking build %1% as failed") % build2->id);
txn.parameterized
("update Builds set finished = 1, isCachedBuild = 0, buildStatus = $2, startTime = $3, stopTime = $4 where id = $1")
(build2->id)
((int) (build2->drvPath == step->drvPath ? bsFailed : bsDepFailed))
(startTime)
(stopTime).exec();
(result.startTime)
(result.stopTime).exec();
build2->finishedInDB = true; // FIXME: txn might fail
}
}
@ -715,7 +914,7 @@ void State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step)
dependent Build objects. Any Steps not referenced by other
Builds will be destroyed as well. */
for (auto build2 : dependents)
if (build2->toplevel == step || !success) {
if (build2->toplevel == step || result.status != RemoteResult::rrSuccess) {
auto builds_(builds.lock());
builds_->erase(build2->id);
}
@ -723,13 +922,15 @@ void State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step)
/* Remove the step from the graph. In case of success, make
dependent build steps runnable if they have no other
dependencies. */
destroyStep(step, success);
destroyStep(step, result.status == RemoteResult::rrSuccess);
}
void State::markSucceededBuild(pqxx::work & txn, Build::ptr build,
const BuildResult & res, bool isCachedBuild, time_t startTime, time_t stopTime)
{
printMsg(lvlError, format("marking build %1% as succeeded") % build->id);
txn.parameterized
("update Builds set finished = 1, buildStatus = $2, startTime = $3, stopTime = $4, size = $5, closureSize = $6, releaseName = $7, isCachedBuild = $8 where id = $1")
(build->id)
@ -765,11 +966,11 @@ void State::run()
{
markActiveBuildStepsAsAborted(0);
queueMonitorThread = std::thread(&State::queueMonitor, this);
loadMachines();
std::vector<std::thread> builderThreads;
for (int n = 0; n < 4; n++)
builderThreads.push_back(std::thread(&State::builderThreadEntry, this, n));
auto queueMonitorThread = std::thread(&State::queueMonitor, this);
auto dispatcherThread = std::thread(&State::dispatcher, this);
/* Wait for SIGINT. */
{
@ -785,9 +986,8 @@ void State::run()
queueMonitorWakeup.notify_all();
queueMonitorThread.join();
{ auto runnable_(runnable.lock()); } // barrier
runnableWakeup.notify_all();
for (auto & thread : builderThreads) thread.join();
wakeDispatcher();
dispatcherThread.join();
printMsg(lvlError, format("psql connections = %1%") % dbPool.count());
}

View file

@ -133,8 +133,9 @@ sub getDrvLogPath {
my $base = basename $drvPath;
my $bucketed = substr($base, 0, 2) . "/" . substr($base, 2);
my $fn = ($ENV{NIX_LOG_DIR} || "/nix/var/log/nix") . "/drvs/";
for ($fn . $bucketed . ".bz2", $fn . $bucketed, $fn . $base . ".bz2", $fn . $base) {
return $_ if (-f $_);
my $fn2 = Hydra::Model::DB::getHydraPath . "/build-logs/";
for ($fn2 . $bucketed, $fn . $bucketed . ".bz2", $fn . $bucketed, $fn . $base . ".bz2", $fn . $base) {
return $_ if -f $_;
}
return undef;
}