Prepare for CA derivation support with lower impact changes

This is just C++ changes without any Perl / Frontend / SQL Schema
changes.

The idea is that it should be possible to redeploy Hydra with these
chnages with (a) no schema migration and also (b) no regressions. We
should be able to much more safely deploy these to a staging server and
then production `hydra.nixos.org`.

Extracted from #875

Co-Authored-By: Théophane Hufschmitt <theophane.hufschmitt@tweag.io>
Co-Authored-By: Alexander Sosedkin <monk@unboiled.info>
Co-Authored-By: Andrea Ciceri <andrea.ciceri@autistici.org>
Co-Authored-By: Charlotte 🦝 Delenk Mlotte@chir.rs>
Co-Authored-By: Sandro Jäckel <sandro.jaeckel@gmail.com>
This commit is contained in:
John Ericson 2023-12-04 16:05:50 -05:00
parent a5d44b60ea
commit 9ba4417940
8 changed files with 213 additions and 107 deletions

View file

@ -178,7 +178,11 @@ static void worker(
if (auto drv = getDerivation(state, *v, false)) {
DrvInfo::Outputs outputs = drv->queryOutputs();
// CA derivations do not have static output paths, so we
// have to defensively not query output paths in case we
// encounter one.
DrvInfo::Outputs outputs = drv->queryOutputs(
!experimentalFeatureSettings.isEnabled(Xp::CaDerivations));
if (drv->querySystem() == "unknown")
throw EvalError("derivation must have a 'system' attribute");
@ -239,12 +243,12 @@ static void worker(
}
nlohmann::json out;
for (auto & j : outputs)
// FIXME: handle CA/impure builds.
if (j.second)
out[j.first] = state.store->printStorePath(*j.second);
for (auto & [outputName, optOutputPath] : outputs)
if (optOutputPath)
out[outputName] = state.store->printStorePath(*optOutputPath);
else
out[outputName] = "";
job["outputs"] = std::move(out);
reply["job"] = std::move(job);
}

View file

@ -182,6 +182,41 @@ static StorePaths reverseTopoSortPaths(const std::map<StorePath, ValidPathInfo>
return sorted;
}
/**
* Replace the input derivations by their output paths to send a minimal closure
* to the builder.
*
* If we can afford it, resolve it, so that the newly generated derivation still
* has some sensible output paths.
*/
BasicDerivation inlineInputDerivations(Store & store, Derivation & drv, const StorePath & drvPath)
{
BasicDerivation ret;
auto outputHashes = staticOutputHashes(store, drv);
if (!drv.type().hasKnownOutputPaths()) {
auto maybeBasicDrv = drv.tryResolve(store);
if (!maybeBasicDrv)
throw Error(
"the derivation '%s' cant be resolved. Its probably "
"missing some outputs",
store.printStorePath(drvPath));
ret = *maybeBasicDrv;
} else {
// If the derivation is a real `InputAddressed` derivation, we must
// resolve it manually to keep the original output paths
ret = BasicDerivation(drv);
for (auto & [drvPath, node] : drv.inputDrvs.map) {
auto drv2 = store.readDerivation(drvPath);
auto drv2Outputs = drv2.outputsAndOptPaths(store);
for (auto & name : node.value) {
auto inputPath = drv2Outputs.at(name);
ret.inputSrcs.insert(*inputPath.second);
}
}
}
return ret;
}
static std::pair<Path, AutoCloseFD> openLogFile(const std::string & logDir, const StorePath & drvPath)
{
std::string base(drvPath.to_string());
@ -228,17 +263,7 @@ static BasicDerivation sendInputs(
counter & nrStepsCopyingTo
)
{
BasicDerivation basicDrv(*step.drv);
for (const auto & [drvPath, node] : step.drv->inputDrvs.map) {
auto drv2 = localStore.readDerivation(drvPath);
for (auto & name : node.value) {
if (auto i = get(drv2.outputs, name)) {
auto outPath = i->path(localStore, drv2.name, name);
basicDrv.inputSrcs.insert(*outPath);
}
}
}
BasicDerivation basicDrv = inlineInputDerivations(localStore, *step.drv, step.drvPath);
/* Ensure that the inputs exist in the destination store. This is
a no-op for regular stores, but for the binary cache store,
@ -323,8 +348,33 @@ static BuildResult performBuild(
result.stopTime = stop;
}
}
// Get the newly built outputs, either from the remote if it
// supports it, or by introspecting the derivation if the remote is
// too old.
if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 6) {
ServeProto::Serialise<DrvOutputs>::read(localStore, conn);
auto builtOutputs = ServeProto::Serialise<DrvOutputs>::read(localStore, conn);
for (auto && [output, realisation] : builtOutputs)
result.builtOutputs.insert_or_assign(
std::move(output.outputName),
std::move(realisation));
} else {
// If the remote is too old to handle CA derivations, we cant get this
// far anyways
assert(drv.type().hasKnownOutputPaths());
DerivationOutputsAndOptPaths drvOutputs = drv.outputsAndOptPaths(localStore);
auto outputHashes = staticOutputHashes(localStore, drv);
for (auto & [outputName, output] : drvOutputs) {
auto outputPath = output.second;
// Weve just asserted that the output paths of the derivation
// were known
assert(outputPath);
auto outputHash = outputHashes.at(outputName);
auto drvOutput = DrvOutput { outputHash, outputName };
result.builtOutputs.insert_or_assign(
std::move(outputName),
Realisation { drvOutput, *outputPath });
}
}
return result;
@ -376,6 +426,7 @@ static void copyPathFromRemote(
const ValidPathInfo & info
)
{
for (auto * store : {&destStore, &localStore}) {
/* Receive the NAR from the remote and add it to the
destination store. Meanwhile, extract all the info from the
NAR that getBuildOutput() needs. */
@ -395,7 +446,8 @@ static void copyPathFromRemote(
extractNarData(tee, localStore.printStorePath(info.path), narMembers);
});
destStore.addToStore(info, *source2, NoRepair, NoCheckSigs);
store->addToStore(info, *source2, NoRepair, NoCheckSigs);
}
}
static void copyPathsFromRemote(
@ -592,6 +644,10 @@ void State::buildRemote(ref<Store> destStore,
result.logFile = "";
}
StorePathSet outputs;
for (auto & [_, realisation] : buildResult.builtOutputs)
outputs.insert(realisation.outPath);
/* Copy the output paths. */
if (!machine->isLocalhost() || localStore != std::shared_ptr<Store>(destStore)) {
updateStep(ssReceivingOutputs);
@ -600,12 +656,6 @@ void State::buildRemote(ref<Store> destStore,
auto now1 = std::chrono::steady_clock::now();
StorePathSet outputs;
for (auto & i : step->drv->outputsAndOptPaths(*localStore)) {
if (i.second.second)
outputs.insert(*i.second.second);
}
size_t totalNarSize = 0;
auto infos = build_remote::queryPathInfos(conn, *localStore, outputs, totalNarSize);
@ -624,6 +674,23 @@ void State::buildRemote(ref<Store> destStore,
result.overhead += std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
}
/* Register the outputs of the newly built drv */
if (experimentalFeatureSettings.isEnabled(Xp::CaDerivations)) {
auto outputHashes = staticOutputHashes(*localStore, *step->drv);
for (auto & [outputName, realisation] : buildResult.builtOutputs) {
// Register the resolved drv output
localStore->registerDrvOutput(realisation);
destStore->registerDrvOutput(realisation);
// Also register the unresolved one
auto unresolvedRealisation = realisation;
unresolvedRealisation.signatures.clear();
unresolvedRealisation.id.drvHash = outputHashes.at(outputName);
localStore->registerDrvOutput(unresolvedRealisation);
destStore->registerDrvOutput(unresolvedRealisation);
}
}
/* Shut down the connection. */
child.to = -1;
child.pid.wait();

View file

@ -11,18 +11,18 @@ using namespace nix;
BuildOutput getBuildOutput(
nix::ref<Store> store,
NarMemberDatas & narMembers,
const Derivation & drv)
const OutputPathMap derivationOutputs)
{
BuildOutput res;
/* Compute the closure size. */
StorePathSet outputs;
StorePathSet closure;
for (auto & i : drv.outputsAndOptPaths(*store))
if (i.second.second) {
store->computeFSClosure(*i.second.second, closure);
outputs.insert(*i.second.second);
}
for (auto& [outputName, outputPath] : derivationOutputs) {
store->computeFSClosure(outputPath, closure);
outputs.insert(outputPath);
res.outputs.insert({outputName, outputPath});
}
for (auto & path : closure) {
auto info = store->queryPathInfo(path);
res.closureSize += info->narSize;
@ -107,13 +107,12 @@ BuildOutput getBuildOutput(
/* If no build products were explicitly declared, then add all
outputs as a product of type "nix-build". */
if (!explicitProducts) {
for (auto & [name, output] : drv.outputs) {
for (auto & [name, output] : derivationOutputs) {
BuildProduct product;
auto outPath = output.path(*store, drv.name, name);
product.path = store->printStorePath(*outPath);
product.path = store->printStorePath(output);
product.type = "nix-build";
product.subtype = name == "out" ? "" : name;
product.name = outPath->name();
product.name = output.name();
auto file = narMembers.find(product.path);
assert(file != narMembers.end());

View file

@ -223,7 +223,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
if (result.stepStatus == bsSuccess) {
updateStep(ssPostProcessing);
res = getBuildOutput(destStore, narMembers, *step->drv);
res = getBuildOutput(destStore, narMembers, localStore->queryDerivationOutputMap(step->drvPath));
}
}
@ -277,9 +277,9 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
assert(stepNr);
for (auto & i : step->drv->outputsAndOptPaths(*localStore)) {
if (i.second.second)
addRoot(*i.second.second);
for (auto & i : localStore->queryPartialDerivationOutputMap(step->drvPath)) {
if (i.second)
addRoot(*i.second);
}
/* Register success in the database for all Build objects that

View file

@ -36,10 +36,12 @@ struct BuildOutput
std::list<BuildProduct> products;
std::map<std::string, nix::StorePath> outputs;
std::map<std::string, BuildMetric> metrics;
};
BuildOutput getBuildOutput(
nix::ref<nix::Store> store,
NarMemberDatas & narMembers,
const nix::Derivation & drv);
const nix::OutputPathMap derivationOutputs);

View file

@ -312,10 +312,10 @@ unsigned int State::createBuildStep(pqxx::work & txn, time_t startTime, BuildID
if (r.affected_rows() == 0) goto restart;
for (auto & [name, output] : step->drv->outputs)
for (auto & [name, output] : localStore->queryPartialDerivationOutputMap(step->drvPath))
txn.exec_params0
("insert into BuildStepOutputs (build, stepnr, name, path) values ($1, $2, $3, $4)",
buildId, stepNr, name, localStore->printStorePath(*output.path(*localStore, step->drv->name, name)));
buildId, stepNr, name, output ? localStore->printStorePath(*output) : "");
if (status == bsBusy)
txn.exec(fmt("notify step_started, '%d\t%d'", buildId, stepNr));
@ -352,11 +352,23 @@ void State::finishBuildStep(pqxx::work & txn, const RemoteResult & result,
assert(result.logFile.find('\t') == std::string::npos);
txn.exec(fmt("notify step_finished, '%d\t%d\t%s'",
buildId, stepNr, result.logFile));
if (result.stepStatus == bsSuccess) {
// Update the corresponding `BuildStepOutputs` row to add the output path
auto res = txn.exec_params1("select drvPath from BuildSteps where build = $1 and stepnr = $2", buildId, stepNr);
assert(res.size());
StorePath drvPath = localStore->parseStorePath(res[0].as<std::string>());
// If we've finished building, all the paths should be known
for (auto & [name, output] : localStore->queryDerivationOutputMap(drvPath))
txn.exec_params0
("update BuildStepOutputs set path = $4 where build = $1 and stepnr = $2 and name = $3",
buildId, stepNr, name, localStore->printStorePath(output));
}
}
int State::createSubstitutionStep(pqxx::work & txn, time_t startTime, time_t stopTime,
Build::ptr build, const StorePath & drvPath, const std::string & outputName, const StorePath & storePath)
Build::ptr build, const StorePath & drvPath, const nix::Derivation drv, const std::string & outputName, const StorePath & storePath)
{
restart:
auto stepNr = allocBuildStep(txn, build->id);
@ -457,6 +469,15 @@ void State::markSucceededBuild(pqxx::work & txn, Build::ptr build,
res.releaseName != "" ? std::make_optional(res.releaseName) : std::nullopt,
isCachedBuild ? 1 : 0);
for (auto & [outputName, outputPath] : res.outputs) {
txn.exec_params0
("update BuildOutputs set path = $3 where build = $1 and name = $2",
build->id,
outputName,
localStore->printStorePath(outputPath)
);
}
txn.exec_params0("delete from BuildProducts where build = $1", build->id);
unsigned int productNr = 1;

View file

@ -192,15 +192,14 @@ bool State::getQueuedBuilds(Connection & conn,
if (!res[0].is_null()) propagatedFrom = res[0].as<BuildID>();
if (!propagatedFrom) {
for (auto & i : ex.step->drv->outputsAndOptPaths(*localStore)) {
if (i.second.second) {
auto res = txn.exec_params
("select max(s.build) from BuildSteps s join BuildStepOutputs o on s.build = o.build where path = $1 and startTime != 0 and stopTime != 0 and status = 1",
localStore->printStorePath(*i.second.second));
if (!res[0][0].is_null()) {
propagatedFrom = res[0][0].as<BuildID>();
break;
}
for (auto & i : localStore->queryPartialDerivationOutputMap(ex.step->drvPath)) {
auto res = txn.exec_params
("select max(s.build) from BuildSteps s join BuildStepOutputs o on s.build = o.build where drvPath = $1 and name = $2 and startTime != 0 and stopTime != 0 and status = 1",
localStore->printStorePath(ex.step->drvPath),
i.first);
if (!res[0][0].is_null()) {
propagatedFrom = res[0][0].as<BuildID>();
break;
}
}
}
@ -236,12 +235,10 @@ bool State::getQueuedBuilds(Connection & conn,
/* If we didn't get a step, it means the step's outputs are
all valid. So we mark this as a finished, cached build. */
if (!step) {
auto drv = localStore->readDerivation(build->drvPath);
BuildOutput res = getBuildOutputCached(conn, destStore, drv);
BuildOutput res = getBuildOutputCached(conn, destStore, build->drvPath);
for (auto & i : drv.outputsAndOptPaths(*localStore))
if (i.second.second)
addRoot(*i.second.second);
for (auto & i : localStore->queryDerivationOutputMap(build->drvPath))
addRoot(i.second);
{
auto mc = startDbUpdate();
@ -481,26 +478,39 @@ Step::ptr State::createStep(ref<Store> destStore,
throw PreviousFailure{step};
/* Are all outputs valid? */
auto outputHashes = staticOutputHashes(*localStore, *(step->drv));
bool valid = true;
DerivationOutputs missing;
for (auto & i : step->drv->outputs)
if (!destStore->isValidPath(*i.second.path(*localStore, step->drv->name, i.first))) {
valid = false;
missing.insert_or_assign(i.first, i.second);
std::map<DrvOutput, std::optional<StorePath>> missing;
for (auto &[outputName, maybeOutputPath] : step->drv->outputsAndOptPaths(*destStore)) {
auto outputHash = outputHashes.at(outputName);
if (maybeOutputPath.second) {
if (!destStore->isValidPath(*maybeOutputPath.second)) {
valid = false;
missing.insert({{outputHash, outputName}, maybeOutputPath.second});
}
} else {
experimentalFeatureSettings.require(Xp::CaDerivations);
if (!destStore->queryRealisation(DrvOutput{outputHash, outputName})) {
valid = false;
missing.insert({{outputHash, outputName}, std::nullopt});
}
}
}
/* Try to copy the missing paths from the local store or from
substitutes. */
if (!missing.empty()) {
size_t avail = 0;
for (auto & i : missing) {
auto path = i.second.path(*localStore, step->drv->name, i.first);
if (/* localStore != destStore && */ localStore->isValidPath(*path))
for (auto & [i, maybePath] : missing) {
if ((maybePath && localStore->isValidPath(*maybePath)))
avail++;
else if (useSubstitutes) {
else if (experimentalFeatureSettings.isEnabled(Xp::CaDerivations) && localStore->queryRealisation(i)) {
maybePath = localStore->queryRealisation(i)->outPath;
avail++;
} else if (useSubstitutes && maybePath) {
SubstitutablePathInfos infos;
localStore->querySubstitutablePathInfos({{*path, {}}}, infos);
localStore->querySubstitutablePathInfos({{*maybePath, {}}}, infos);
if (infos.size() == 1)
avail++;
}
@ -508,44 +518,44 @@ Step::ptr State::createStep(ref<Store> destStore,
if (missing.size() == avail) {
valid = true;
for (auto & i : missing) {
auto path = i.second.path(*localStore, step->drv->name, i.first);
for (auto & [i, path] : missing) {
if (path) {
try {
time_t startTime = time(0);
try {
time_t startTime = time(0);
if (localStore->isValidPath(*path))
printInfo("copying output %1% of %2% from local store",
localStore->printStorePath(*path),
localStore->printStorePath(drvPath));
else {
printInfo("substituting output %1% of %2%",
localStore->printStorePath(*path),
localStore->printStorePath(drvPath));
localStore->ensurePath(*path);
// FIXME: should copy directly from substituter to destStore.
}
if (localStore->isValidPath(*path))
printInfo("copying output %1% of %2% from local store",
StorePathSet closure;
localStore->computeFSClosure({*path}, closure);
copyPaths(*localStore, *destStore, closure, NoRepair, CheckSigs, NoSubstitute);
time_t stopTime = time(0);
{
auto mc = startDbUpdate();
pqxx::work txn(conn);
createSubstitutionStep(txn, startTime, stopTime, build, drvPath, *(step->drv), "out", *path);
txn.commit();
}
} catch (Error & e) {
printError("while copying/substituting output %s of %s: %s",
localStore->printStorePath(*path),
localStore->printStorePath(drvPath));
else {
printInfo("substituting output %1% of %2%",
localStore->printStorePath(*path),
localStore->printStorePath(drvPath));
localStore->ensurePath(*path);
// FIXME: should copy directly from substituter to destStore.
localStore->printStorePath(drvPath),
e.what());
valid = false;
break;
}
copyClosure(*localStore, *destStore,
StorePathSet { *path },
NoRepair, CheckSigs, NoSubstitute);
time_t stopTime = time(0);
{
auto mc = startDbUpdate();
pqxx::work txn(conn);
createSubstitutionStep(txn, startTime, stopTime, build, drvPath, "out", *path);
txn.commit();
}
} catch (Error & e) {
printError("while copying/substituting output %s of %s: %s",
localStore->printStorePath(*path),
localStore->printStorePath(drvPath),
e.what());
valid = false;
break;
}
}
}
@ -640,17 +650,20 @@ void State::processJobsetSharesChange(Connection & conn)
}
BuildOutput State::getBuildOutputCached(Connection & conn, nix::ref<nix::Store> destStore, const nix::Derivation & drv)
BuildOutput State::getBuildOutputCached(Connection & conn, nix::ref<nix::Store> destStore, const nix::StorePath & drvPath)
{
auto derivationOutputs = localStore->queryDerivationOutputMap(drvPath);
{
pqxx::work txn(conn);
for (auto & [name, output] : drv.outputsAndOptPaths(*localStore)) {
for (auto & [name, output] : derivationOutputs) {
auto r = txn.exec_params
("select id, buildStatus, releaseName, closureSize, size from Builds b "
"join BuildOutputs o on b.id = o.build "
"where finished = 1 and (buildStatus = 0 or buildStatus = 6) and path = $1",
localStore->printStorePath(*output.second));
localStore->printStorePath(output));
if (r.empty()) continue;
BuildID id = r[0][0].as<BuildID>();
@ -704,5 +717,5 @@ BuildOutput State::getBuildOutputCached(Connection & conn, nix::ref<nix::Store>
}
NarMemberDatas narMembers;
return getBuildOutput(destStore, narMembers, drv);
return getBuildOutput(destStore, narMembers, derivationOutputs);
}

View file

@ -520,7 +520,7 @@ private:
const std::string & machine);
int createSubstitutionStep(pqxx::work & txn, time_t startTime, time_t stopTime,
Build::ptr build, const nix::StorePath & drvPath, const std::string & outputName, const nix::StorePath & storePath);
Build::ptr build, const nix::StorePath & drvPath, const nix::Derivation drv, const std::string & outputName, const nix::StorePath & storePath);
void updateBuild(pqxx::work & txn, Build::ptr build, BuildStatus status);
@ -536,7 +536,7 @@ private:
void processQueueChange(Connection & conn);
BuildOutput getBuildOutputCached(Connection & conn, nix::ref<nix::Store> destStore,
const nix::Derivation & drv);
const nix::StorePath & drvPath);
Step::ptr createStep(nix::ref<nix::Store> store,
Connection & conn, Build::ptr build, const nix::StorePath & drvPath,