hydra-queue-runner: Use substitutes

This allows Hydra to use binaries from available binary caches. It
makes the queue monitor thread quite a bit slower, so if you don't
want to use binary caches, it's better to add "--option
build-use-substitutes false" to the hydra-queue-runner invocation.

Fixed #243.
This commit is contained in:
Eelco Dolstra 2015-10-05 14:57:44 +02:00
parent ae2cc61be6
commit 82504fe010
3 changed files with 87 additions and 11 deletions

View file

@ -168,19 +168,29 @@ void State::clearBusy(Connection & conn, time_t stopTime)
} }
int State::createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step, int State::allocBuildStep(pqxx::work & txn, Build::ptr build)
const std::string & machine, BuildStepStatus status, const std::string & errorMsg, BuildID propagatedFrom)
{ {
/* Acquire an exclusive lock on BuildSteps to ensure that we don't /* Acquire an exclusive lock on BuildSteps to ensure that we don't
race with other threads creating a step of the same build. */ race with other threads creating a step of the same build. */
txn.exec("lock table BuildSteps in exclusive mode"); txn.exec("lock table BuildSteps in exclusive mode");
auto res = txn.parameterized("select max(stepnr) from BuildSteps where build = $1")(build->id).exec(); auto res = txn.parameterized("select max(stepnr) from BuildSteps where build = $1")(build->id).exec();
int stepNr = res[0][0].is_null() ? 1 : res[0][0].as<int>() + 1; return res[0][0].is_null() ? 1 : res[0][0].as<int>() + 1;
}
int State::createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step,
const std::string & machine, BuildStepStatus status, const std::string & errorMsg, BuildID propagatedFrom)
{
int stepNr = allocBuildStep(txn, build);
txn.parameterized txn.parameterized
("insert into BuildSteps (build, stepnr, type, drvPath, busy, startTime, system, status, propagatedFrom, errorMsg, stopTime, machine) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)") ("insert into BuildSteps (build, stepnr, type, drvPath, busy, startTime, system, status, propagatedFrom, errorMsg, stopTime, machine) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)")
(build->id)(stepNr)(0)(step->drvPath)(status == bssBusy ? 1 : 0) (build->id)
(stepNr)
(0) // == build
(step->drvPath)
(status == bssBusy ? 1 : 0)
(startTime, startTime != 0) (startTime, startTime != 0)
(step->drv.platform) (step->drv.platform)
((int) status, status != bssBusy) ((int) status, status != bssBusy)
@ -213,6 +223,30 @@ void State::finishBuildStep(pqxx::work & txn, time_t startTime, time_t stopTime,
} }
int State::createSubstitutionStep(pqxx::work & txn, time_t startTime, time_t stopTime,
Build::ptr build, const Path & drvPath, const string & outputName, const Path & storePath)
{
int stepNr = allocBuildStep(txn, build);
txn.parameterized
("insert into BuildSteps (build, stepnr, type, drvPath, busy, status, startTime, stopTime) values ($1, $2, $3, $4, $5, $6, $7, $8)")
(build->id)
(stepNr)
(1) // == substitution
(drvPath)
(0)
(0)
(startTime)
(stopTime).exec();
txn.parameterized
("insert into BuildStepOutputs (build, stepnr, name, path) values ($1, $2, $3, $4)")
(build->id)(stepNr)(outputName)(storePath).exec();
return stepNr;
}
/* Get the steps and unfinished builds that depend on the given step. */ /* Get the steps and unfinished builds that depend on the given step. */
void getDependents(Step::ptr step, std::set<Build::ptr> & builds, std::set<Step::ptr> & steps) void getDependents(Step::ptr step, std::set<Build::ptr> & builds, std::set<Step::ptr> & steps)
{ {
@ -687,7 +721,6 @@ int main(int argc, char * * argv)
}); });
settings.buildVerbosity = lvlVomit; settings.buildVerbosity = lvlVomit;
settings.useSubstitutes = false;
settings.lockCPU = false; settings.lockCPU = false;
State state; State state;

View file

@ -1,5 +1,7 @@
#include "state.hh" #include "state.hh"
#include "build-result.hh" #include "build-result.hh"
#include "globals.hh"
using namespace nix; using namespace nix;
@ -131,7 +133,7 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store,
std::set<Step::ptr> newSteps; std::set<Step::ptr> newSteps;
std::set<Path> finishedDrvs; // FIXME: re-use? std::set<Path> finishedDrvs; // FIXME: re-use?
Step::ptr step = createStep(store, build->drvPath, build, 0, finishedDrvs, newSteps, newRunnable); Step::ptr step = createStep(store, conn, build, build->drvPath, build, 0, finishedDrvs, newSteps, newRunnable);
/* Some of the new steps may be the top level of builds that /* Some of the new steps may be the top level of builds that
we haven't processed yet. So do them now. This ensures that we haven't processed yet. So do them now. This ensures that
@ -298,7 +300,8 @@ void State::processQueueChange(Connection & conn)
} }
Step::ptr State::createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath, Step::ptr State::createStep(std::shared_ptr<StoreAPI> store,
Connection & conn, Build::ptr build, const Path & drvPath,
Build::ptr referringBuild, Step::ptr referringStep, std::set<Path> & finishedDrvs, Build::ptr referringBuild, Step::ptr referringStep, std::set<Path> & finishedDrvs,
std::set<Step::ptr> & newSteps, std::set<Step::ptr> & newRunnable) std::set<Step::ptr> & newSteps, std::set<Step::ptr> & newRunnable)
{ {
@ -376,12 +379,46 @@ Step::ptr State::createStep(std::shared_ptr<StoreAPI> store, const Path & drvPat
/* Are all outputs valid? */ /* Are all outputs valid? */
bool valid = true; bool valid = true;
for (auto & i : step->drv.outputs) { PathSet outputs = outputPaths(step->drv);
DerivationOutputs missing;
PathSet missingPaths;
for (auto & i : step->drv.outputs)
if (!store->isValidPath(i.second.path)) { if (!store->isValidPath(i.second.path)) {
valid = false;
missing[i.first] = i.second;
missingPaths.insert(i.second.path);
}
/* Try to substitute the missing paths. Note: can't use the more
efficient querySubstitutablePaths() here because upstream Hydra
servers don't allow it (they have "WantMassQuery: 0"). */
assert(missing.size() == missingPaths.size());
if (!missing.empty() && settings.useSubstitutes) {
SubstitutablePathInfos infos;
store->querySubstitutablePathInfos(missingPaths, infos);
if (infos.size() == missingPaths.size()) {
valid = true;
for (auto & i : missing) {
try {
printMsg(lvlInfo, format("substituting output %1% of %2%") % i.second.path % drvPath);
time_t startTime = time(0);
store->ensurePath(i.second.path);
time_t stopTime = time(0);
{
pqxx::work txn(conn);
createSubstitutionStep(txn, startTime, stopTime, build, drvPath, "out", i.second.path);
txn.commit();
}
} catch (Error & e) {
valid = false; valid = false;
break; break;
} }
} }
}
}
// FIXME: check whether all outputs are in the binary cache. // FIXME: check whether all outputs are in the binary cache.
if (valid) { if (valid) {
@ -395,7 +432,7 @@ Step::ptr State::createStep(std::shared_ptr<StoreAPI> store, const Path & drvPat
/* Create steps for the dependencies. */ /* Create steps for the dependencies. */
for (auto & i : step->drv.inputDrvs) { for (auto & i : step->drv.inputDrvs) {
auto dep = createStep(store, i.first, 0, step, finishedDrvs, newSteps, newRunnable); auto dep = createStep(store, conn, build, i.first, 0, step, finishedDrvs, newSteps, newRunnable);
if (dep) { if (dep) {
auto step_(step->state.lock()); auto step_(step->state.lock());
step_->deps.insert(dep); step_->deps.insert(dep);

View file

@ -357,6 +357,8 @@ private:
/* Thread to reload /etc/nix/machines periodically. */ /* Thread to reload /etc/nix/machines periodically. */
void monitorMachinesFile(); void monitorMachinesFile();
int allocBuildStep(pqxx::work & txn, Build::ptr build);
int createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step, int createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step,
const std::string & machine, BuildStepStatus status, const std::string & errorMsg = "", const std::string & machine, BuildStepStatus status, const std::string & errorMsg = "",
BuildID propagatedFrom = 0); BuildID propagatedFrom = 0);
@ -365,6 +367,9 @@ private:
const std::string & machine, BuildStepStatus status, const std::string & errorMsg = "", const std::string & machine, BuildStepStatus status, const std::string & errorMsg = "",
BuildID propagatedFrom = 0); BuildID propagatedFrom = 0);
int createSubstitutionStep(pqxx::work & txn, time_t startTime, time_t stopTime,
Build::ptr build, const nix::Path & drvPath, const std::string & outputName, const nix::Path & storePath);
void updateBuild(pqxx::work & txn, Build::ptr build, BuildStatus status); void updateBuild(pqxx::work & txn, Build::ptr build, BuildStatus status);
void queueMonitor(); void queueMonitor();
@ -377,7 +382,8 @@ private:
/* Handle cancellation, deletion and priority bumps. */ /* Handle cancellation, deletion and priority bumps. */
void processQueueChange(Connection & conn); void processQueueChange(Connection & conn);
Step::ptr createStep(std::shared_ptr<nix::StoreAPI> store, const nix::Path & drvPath, Step::ptr createStep(std::shared_ptr<nix::StoreAPI> store,
Connection & conn, Build::ptr build, const nix::Path & drvPath,
Build::ptr referringBuild, Step::ptr referringStep, std::set<nix::Path> & finishedDrvs, Build::ptr referringBuild, Step::ptr referringStep, std::set<nix::Path> & finishedDrvs,
std::set<Step::ptr> & newSteps, std::set<Step::ptr> & newRunnable); std::set<Step::ptr> & newSteps, std::set<Step::ptr> & newRunnable);