From 9ba4417940ffdd0fadea43f68c61ef948a4b8d39 Mon Sep 17 00:00:00 2001 From: John Ericson Date: Mon, 4 Dec 2023 16:05:50 -0500 Subject: [PATCH] Prepare for CA derivation support with lower impact changes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is just C++ changes without any Perl / Frontend / SQL Schema changes. The idea is that it should be possible to redeploy Hydra with these chnages with (a) no schema migration and also (b) no regressions. We should be able to much more safely deploy these to a staging server and then production `hydra.nixos.org`. Extracted from #875 Co-Authored-By: Théophane Hufschmitt Co-Authored-By: Alexander Sosedkin Co-Authored-By: Andrea Ciceri Co-Authored-By: Charlotte 🦝 Delenk Mlotte@chir.rs> Co-Authored-By: Sandro Jäckel --- src/hydra-eval-jobs/hydra-eval-jobs.cc | 16 ++- src/hydra-queue-runner/build-remote.cc | 105 +++++++++++--- src/hydra-queue-runner/build-result.cc | 19 ++- src/hydra-queue-runner/builder.cc | 8 +- src/hydra-queue-runner/hydra-build-result.hh | 4 +- src/hydra-queue-runner/hydra-queue-runner.cc | 27 +++- src/hydra-queue-runner/queue-monitor.cc | 137 ++++++++++--------- src/hydra-queue-runner/state.hh | 4 +- 8 files changed, 213 insertions(+), 107 deletions(-) diff --git a/src/hydra-eval-jobs/hydra-eval-jobs.cc b/src/hydra-eval-jobs/hydra-eval-jobs.cc index 2fe2c80f..d007189d 100644 --- a/src/hydra-eval-jobs/hydra-eval-jobs.cc +++ b/src/hydra-eval-jobs/hydra-eval-jobs.cc @@ -178,7 +178,11 @@ static void worker( if (auto drv = getDerivation(state, *v, false)) { - DrvInfo::Outputs outputs = drv->queryOutputs(); + // CA derivations do not have static output paths, so we + // have to defensively not query output paths in case we + // encounter one. + DrvInfo::Outputs outputs = drv->queryOutputs( + !experimentalFeatureSettings.isEnabled(Xp::CaDerivations)); if (drv->querySystem() == "unknown") throw EvalError("derivation must have a 'system' attribute"); @@ -239,12 +243,12 @@ static void worker( } nlohmann::json out; - for (auto & j : outputs) - // FIXME: handle CA/impure builds. - if (j.second) - out[j.first] = state.store->printStorePath(*j.second); + for (auto & [outputName, optOutputPath] : outputs) + if (optOutputPath) + out[outputName] = state.store->printStorePath(*optOutputPath); + else + out[outputName] = ""; job["outputs"] = std::move(out); - reply["job"] = std::move(job); } diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc index 73f46a53..26f6d63f 100644 --- a/src/hydra-queue-runner/build-remote.cc +++ b/src/hydra-queue-runner/build-remote.cc @@ -182,6 +182,41 @@ static StorePaths reverseTopoSortPaths(const std::map return sorted; } +/** + * Replace the input derivations by their output paths to send a minimal closure + * to the builder. + * + * If we can afford it, resolve it, so that the newly generated derivation still + * has some sensible output paths. + */ +BasicDerivation inlineInputDerivations(Store & store, Derivation & drv, const StorePath & drvPath) +{ + BasicDerivation ret; + auto outputHashes = staticOutputHashes(store, drv); + if (!drv.type().hasKnownOutputPaths()) { + auto maybeBasicDrv = drv.tryResolve(store); + if (!maybeBasicDrv) + throw Error( + "the derivation '%s' can’t be resolved. It’s probably " + "missing some outputs", + store.printStorePath(drvPath)); + ret = *maybeBasicDrv; + } else { + // If the derivation is a real `InputAddressed` derivation, we must + // resolve it manually to keep the original output paths + ret = BasicDerivation(drv); + for (auto & [drvPath, node] : drv.inputDrvs.map) { + auto drv2 = store.readDerivation(drvPath); + auto drv2Outputs = drv2.outputsAndOptPaths(store); + for (auto & name : node.value) { + auto inputPath = drv2Outputs.at(name); + ret.inputSrcs.insert(*inputPath.second); + } + } + } + return ret; +} + static std::pair openLogFile(const std::string & logDir, const StorePath & drvPath) { std::string base(drvPath.to_string()); @@ -228,17 +263,7 @@ static BasicDerivation sendInputs( counter & nrStepsCopyingTo ) { - BasicDerivation basicDrv(*step.drv); - - for (const auto & [drvPath, node] : step.drv->inputDrvs.map) { - auto drv2 = localStore.readDerivation(drvPath); - for (auto & name : node.value) { - if (auto i = get(drv2.outputs, name)) { - auto outPath = i->path(localStore, drv2.name, name); - basicDrv.inputSrcs.insert(*outPath); - } - } - } + BasicDerivation basicDrv = inlineInputDerivations(localStore, *step.drv, step.drvPath); /* Ensure that the inputs exist in the destination store. This is a no-op for regular stores, but for the binary cache store, @@ -323,8 +348,33 @@ static BuildResult performBuild( result.stopTime = stop; } } + + // Get the newly built outputs, either from the remote if it + // supports it, or by introspecting the derivation if the remote is + // too old. if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 6) { - ServeProto::Serialise::read(localStore, conn); + auto builtOutputs = ServeProto::Serialise::read(localStore, conn); + for (auto && [output, realisation] : builtOutputs) + result.builtOutputs.insert_or_assign( + std::move(output.outputName), + std::move(realisation)); + } else { + // If the remote is too old to handle CA derivations, we can’t get this + // far anyways + assert(drv.type().hasKnownOutputPaths()); + DerivationOutputsAndOptPaths drvOutputs = drv.outputsAndOptPaths(localStore); + auto outputHashes = staticOutputHashes(localStore, drv); + for (auto & [outputName, output] : drvOutputs) { + auto outputPath = output.second; + // We’ve just asserted that the output paths of the derivation + // were known + assert(outputPath); + auto outputHash = outputHashes.at(outputName); + auto drvOutput = DrvOutput { outputHash, outputName }; + result.builtOutputs.insert_or_assign( + std::move(outputName), + Realisation { drvOutput, *outputPath }); + } } return result; @@ -376,6 +426,7 @@ static void copyPathFromRemote( const ValidPathInfo & info ) { + for (auto * store : {&destStore, &localStore}) { /* Receive the NAR from the remote and add it to the destination store. Meanwhile, extract all the info from the NAR that getBuildOutput() needs. */ @@ -395,7 +446,8 @@ static void copyPathFromRemote( extractNarData(tee, localStore.printStorePath(info.path), narMembers); }); - destStore.addToStore(info, *source2, NoRepair, NoCheckSigs); + store->addToStore(info, *source2, NoRepair, NoCheckSigs); + } } static void copyPathsFromRemote( @@ -592,6 +644,10 @@ void State::buildRemote(ref destStore, result.logFile = ""; } + StorePathSet outputs; + for (auto & [_, realisation] : buildResult.builtOutputs) + outputs.insert(realisation.outPath); + /* Copy the output paths. */ if (!machine->isLocalhost() || localStore != std::shared_ptr(destStore)) { updateStep(ssReceivingOutputs); @@ -600,12 +656,6 @@ void State::buildRemote(ref destStore, auto now1 = std::chrono::steady_clock::now(); - StorePathSet outputs; - for (auto & i : step->drv->outputsAndOptPaths(*localStore)) { - if (i.second.second) - outputs.insert(*i.second.second); - } - size_t totalNarSize = 0; auto infos = build_remote::queryPathInfos(conn, *localStore, outputs, totalNarSize); @@ -624,6 +674,23 @@ void State::buildRemote(ref destStore, result.overhead += std::chrono::duration_cast(now2 - now1).count(); } + /* Register the outputs of the newly built drv */ + if (experimentalFeatureSettings.isEnabled(Xp::CaDerivations)) { + auto outputHashes = staticOutputHashes(*localStore, *step->drv); + for (auto & [outputName, realisation] : buildResult.builtOutputs) { + // Register the resolved drv output + localStore->registerDrvOutput(realisation); + destStore->registerDrvOutput(realisation); + + // Also register the unresolved one + auto unresolvedRealisation = realisation; + unresolvedRealisation.signatures.clear(); + unresolvedRealisation.id.drvHash = outputHashes.at(outputName); + localStore->registerDrvOutput(unresolvedRealisation); + destStore->registerDrvOutput(unresolvedRealisation); + } + } + /* Shut down the connection. */ child.to = -1; child.pid.wait(); diff --git a/src/hydra-queue-runner/build-result.cc b/src/hydra-queue-runner/build-result.cc index 691c1f19..ffdc37b7 100644 --- a/src/hydra-queue-runner/build-result.cc +++ b/src/hydra-queue-runner/build-result.cc @@ -11,18 +11,18 @@ using namespace nix; BuildOutput getBuildOutput( nix::ref store, NarMemberDatas & narMembers, - const Derivation & drv) + const OutputPathMap derivationOutputs) { BuildOutput res; /* Compute the closure size. */ StorePathSet outputs; StorePathSet closure; - for (auto & i : drv.outputsAndOptPaths(*store)) - if (i.second.second) { - store->computeFSClosure(*i.second.second, closure); - outputs.insert(*i.second.second); - } + for (auto& [outputName, outputPath] : derivationOutputs) { + store->computeFSClosure(outputPath, closure); + outputs.insert(outputPath); + res.outputs.insert({outputName, outputPath}); + } for (auto & path : closure) { auto info = store->queryPathInfo(path); res.closureSize += info->narSize; @@ -107,13 +107,12 @@ BuildOutput getBuildOutput( /* If no build products were explicitly declared, then add all outputs as a product of type "nix-build". */ if (!explicitProducts) { - for (auto & [name, output] : drv.outputs) { + for (auto & [name, output] : derivationOutputs) { BuildProduct product; - auto outPath = output.path(*store, drv.name, name); - product.path = store->printStorePath(*outPath); + product.path = store->printStorePath(output); product.type = "nix-build"; product.subtype = name == "out" ? "" : name; - product.name = outPath->name(); + product.name = output.name(); auto file = narMembers.find(product.path); assert(file != narMembers.end()); diff --git a/src/hydra-queue-runner/builder.cc b/src/hydra-queue-runner/builder.cc index 307eee8e..42983941 100644 --- a/src/hydra-queue-runner/builder.cc +++ b/src/hydra-queue-runner/builder.cc @@ -223,7 +223,7 @@ State::StepResult State::doBuildStep(nix::ref destStore, if (result.stepStatus == bsSuccess) { updateStep(ssPostProcessing); - res = getBuildOutput(destStore, narMembers, *step->drv); + res = getBuildOutput(destStore, narMembers, localStore->queryDerivationOutputMap(step->drvPath)); } } @@ -277,9 +277,9 @@ State::StepResult State::doBuildStep(nix::ref destStore, assert(stepNr); - for (auto & i : step->drv->outputsAndOptPaths(*localStore)) { - if (i.second.second) - addRoot(*i.second.second); + for (auto & i : localStore->queryPartialDerivationOutputMap(step->drvPath)) { + if (i.second) + addRoot(*i.second); } /* Register success in the database for all Build objects that diff --git a/src/hydra-queue-runner/hydra-build-result.hh b/src/hydra-queue-runner/hydra-build-result.hh index a3f71ae9..7d47f67c 100644 --- a/src/hydra-queue-runner/hydra-build-result.hh +++ b/src/hydra-queue-runner/hydra-build-result.hh @@ -36,10 +36,12 @@ struct BuildOutput std::list products; + std::map outputs; + std::map metrics; }; BuildOutput getBuildOutput( nix::ref store, NarMemberDatas & narMembers, - const nix::Derivation & drv); + const nix::OutputPathMap derivationOutputs); diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index 1d54bb93..2f2c6091 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -312,10 +312,10 @@ unsigned int State::createBuildStep(pqxx::work & txn, time_t startTime, BuildID if (r.affected_rows() == 0) goto restart; - for (auto & [name, output] : step->drv->outputs) + for (auto & [name, output] : localStore->queryPartialDerivationOutputMap(step->drvPath)) txn.exec_params0 ("insert into BuildStepOutputs (build, stepnr, name, path) values ($1, $2, $3, $4)", - buildId, stepNr, name, localStore->printStorePath(*output.path(*localStore, step->drv->name, name))); + buildId, stepNr, name, output ? localStore->printStorePath(*output) : ""); if (status == bsBusy) txn.exec(fmt("notify step_started, '%d\t%d'", buildId, stepNr)); @@ -352,11 +352,23 @@ void State::finishBuildStep(pqxx::work & txn, const RemoteResult & result, assert(result.logFile.find('\t') == std::string::npos); txn.exec(fmt("notify step_finished, '%d\t%d\t%s'", buildId, stepNr, result.logFile)); + + if (result.stepStatus == bsSuccess) { + // Update the corresponding `BuildStepOutputs` row to add the output path + auto res = txn.exec_params1("select drvPath from BuildSteps where build = $1 and stepnr = $2", buildId, stepNr); + assert(res.size()); + StorePath drvPath = localStore->parseStorePath(res[0].as()); + // If we've finished building, all the paths should be known + for (auto & [name, output] : localStore->queryDerivationOutputMap(drvPath)) + txn.exec_params0 + ("update BuildStepOutputs set path = $4 where build = $1 and stepnr = $2 and name = $3", + buildId, stepNr, name, localStore->printStorePath(output)); + } } int State::createSubstitutionStep(pqxx::work & txn, time_t startTime, time_t stopTime, - Build::ptr build, const StorePath & drvPath, const std::string & outputName, const StorePath & storePath) + Build::ptr build, const StorePath & drvPath, const nix::Derivation drv, const std::string & outputName, const StorePath & storePath) { restart: auto stepNr = allocBuildStep(txn, build->id); @@ -457,6 +469,15 @@ void State::markSucceededBuild(pqxx::work & txn, Build::ptr build, res.releaseName != "" ? std::make_optional(res.releaseName) : std::nullopt, isCachedBuild ? 1 : 0); + for (auto & [outputName, outputPath] : res.outputs) { + txn.exec_params0 + ("update BuildOutputs set path = $3 where build = $1 and name = $2", + build->id, + outputName, + localStore->printStorePath(outputPath) + ); + } + txn.exec_params0("delete from BuildProducts where build = $1", build->id); unsigned int productNr = 1; diff --git a/src/hydra-queue-runner/queue-monitor.cc b/src/hydra-queue-runner/queue-monitor.cc index 6c339af6..cee5b6cc 100644 --- a/src/hydra-queue-runner/queue-monitor.cc +++ b/src/hydra-queue-runner/queue-monitor.cc @@ -192,15 +192,14 @@ bool State::getQueuedBuilds(Connection & conn, if (!res[0].is_null()) propagatedFrom = res[0].as(); if (!propagatedFrom) { - for (auto & i : ex.step->drv->outputsAndOptPaths(*localStore)) { - if (i.second.second) { - auto res = txn.exec_params - ("select max(s.build) from BuildSteps s join BuildStepOutputs o on s.build = o.build where path = $1 and startTime != 0 and stopTime != 0 and status = 1", - localStore->printStorePath(*i.second.second)); - if (!res[0][0].is_null()) { - propagatedFrom = res[0][0].as(); - break; - } + for (auto & i : localStore->queryPartialDerivationOutputMap(ex.step->drvPath)) { + auto res = txn.exec_params + ("select max(s.build) from BuildSteps s join BuildStepOutputs o on s.build = o.build where drvPath = $1 and name = $2 and startTime != 0 and stopTime != 0 and status = 1", + localStore->printStorePath(ex.step->drvPath), + i.first); + if (!res[0][0].is_null()) { + propagatedFrom = res[0][0].as(); + break; } } } @@ -236,12 +235,10 @@ bool State::getQueuedBuilds(Connection & conn, /* If we didn't get a step, it means the step's outputs are all valid. So we mark this as a finished, cached build. */ if (!step) { - auto drv = localStore->readDerivation(build->drvPath); - BuildOutput res = getBuildOutputCached(conn, destStore, drv); + BuildOutput res = getBuildOutputCached(conn, destStore, build->drvPath); - for (auto & i : drv.outputsAndOptPaths(*localStore)) - if (i.second.second) - addRoot(*i.second.second); + for (auto & i : localStore->queryDerivationOutputMap(build->drvPath)) + addRoot(i.second); { auto mc = startDbUpdate(); @@ -481,26 +478,39 @@ Step::ptr State::createStep(ref destStore, throw PreviousFailure{step}; /* Are all outputs valid? */ + auto outputHashes = staticOutputHashes(*localStore, *(step->drv)); bool valid = true; - DerivationOutputs missing; - for (auto & i : step->drv->outputs) - if (!destStore->isValidPath(*i.second.path(*localStore, step->drv->name, i.first))) { - valid = false; - missing.insert_or_assign(i.first, i.second); + std::map> missing; + for (auto &[outputName, maybeOutputPath] : step->drv->outputsAndOptPaths(*destStore)) { + auto outputHash = outputHashes.at(outputName); + if (maybeOutputPath.second) { + if (!destStore->isValidPath(*maybeOutputPath.second)) { + valid = false; + missing.insert({{outputHash, outputName}, maybeOutputPath.second}); + } + } else { + experimentalFeatureSettings.require(Xp::CaDerivations); + if (!destStore->queryRealisation(DrvOutput{outputHash, outputName})) { + valid = false; + missing.insert({{outputHash, outputName}, std::nullopt}); + } } + } /* Try to copy the missing paths from the local store or from substitutes. */ if (!missing.empty()) { size_t avail = 0; - for (auto & i : missing) { - auto path = i.second.path(*localStore, step->drv->name, i.first); - if (/* localStore != destStore && */ localStore->isValidPath(*path)) + for (auto & [i, maybePath] : missing) { + if ((maybePath && localStore->isValidPath(*maybePath))) avail++; - else if (useSubstitutes) { + else if (experimentalFeatureSettings.isEnabled(Xp::CaDerivations) && localStore->queryRealisation(i)) { + maybePath = localStore->queryRealisation(i)->outPath; + avail++; + } else if (useSubstitutes && maybePath) { SubstitutablePathInfos infos; - localStore->querySubstitutablePathInfos({{*path, {}}}, infos); + localStore->querySubstitutablePathInfos({{*maybePath, {}}}, infos); if (infos.size() == 1) avail++; } @@ -508,44 +518,44 @@ Step::ptr State::createStep(ref destStore, if (missing.size() == avail) { valid = true; - for (auto & i : missing) { - auto path = i.second.path(*localStore, step->drv->name, i.first); + for (auto & [i, path] : missing) { + if (path) { + try { + time_t startTime = time(0); - try { - time_t startTime = time(0); + if (localStore->isValidPath(*path)) + printInfo("copying output ‘%1%’ of ‘%2%’ from local store", + localStore->printStorePath(*path), + localStore->printStorePath(drvPath)); + else { + printInfo("substituting output ‘%1%’ of ‘%2%’", + localStore->printStorePath(*path), + localStore->printStorePath(drvPath)); + localStore->ensurePath(*path); + // FIXME: should copy directly from substituter to destStore. + } - if (localStore->isValidPath(*path)) - printInfo("copying output ‘%1%’ of ‘%2%’ from local store", + StorePathSet closure; + localStore->computeFSClosure({*path}, closure); + copyPaths(*localStore, *destStore, closure, NoRepair, CheckSigs, NoSubstitute); + + time_t stopTime = time(0); + + { + auto mc = startDbUpdate(); + pqxx::work txn(conn); + createSubstitutionStep(txn, startTime, stopTime, build, drvPath, *(step->drv), "out", *path); + txn.commit(); + } + + } catch (Error & e) { + printError("while copying/substituting output ‘%s’ of ‘%s’: %s", localStore->printStorePath(*path), - localStore->printStorePath(drvPath)); - else { - printInfo("substituting output ‘%1%’ of ‘%2%’", - localStore->printStorePath(*path), - localStore->printStorePath(drvPath)); - localStore->ensurePath(*path); - // FIXME: should copy directly from substituter to destStore. + localStore->printStorePath(drvPath), + e.what()); + valid = false; + break; } - - copyClosure(*localStore, *destStore, - StorePathSet { *path }, - NoRepair, CheckSigs, NoSubstitute); - - time_t stopTime = time(0); - - { - auto mc = startDbUpdate(); - pqxx::work txn(conn); - createSubstitutionStep(txn, startTime, stopTime, build, drvPath, "out", *path); - txn.commit(); - } - - } catch (Error & e) { - printError("while copying/substituting output ‘%s’ of ‘%s’: %s", - localStore->printStorePath(*path), - localStore->printStorePath(drvPath), - e.what()); - valid = false; - break; } } } @@ -640,17 +650,20 @@ void State::processJobsetSharesChange(Connection & conn) } -BuildOutput State::getBuildOutputCached(Connection & conn, nix::ref destStore, const nix::Derivation & drv) +BuildOutput State::getBuildOutputCached(Connection & conn, nix::ref destStore, const nix::StorePath & drvPath) { + + auto derivationOutputs = localStore->queryDerivationOutputMap(drvPath); + { pqxx::work txn(conn); - for (auto & [name, output] : drv.outputsAndOptPaths(*localStore)) { + for (auto & [name, output] : derivationOutputs) { auto r = txn.exec_params ("select id, buildStatus, releaseName, closureSize, size from Builds b " "join BuildOutputs o on b.id = o.build " "where finished = 1 and (buildStatus = 0 or buildStatus = 6) and path = $1", - localStore->printStorePath(*output.second)); + localStore->printStorePath(output)); if (r.empty()) continue; BuildID id = r[0][0].as(); @@ -704,5 +717,5 @@ BuildOutput State::getBuildOutputCached(Connection & conn, nix::ref } NarMemberDatas narMembers; - return getBuildOutput(destStore, narMembers, drv); + return getBuildOutput(destStore, narMembers, derivationOutputs); } diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index 6359063a..5f01de1e 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -520,7 +520,7 @@ private: const std::string & machine); int createSubstitutionStep(pqxx::work & txn, time_t startTime, time_t stopTime, - Build::ptr build, const nix::StorePath & drvPath, const std::string & outputName, const nix::StorePath & storePath); + Build::ptr build, const nix::StorePath & drvPath, const nix::Derivation drv, const std::string & outputName, const nix::StorePath & storePath); void updateBuild(pqxx::work & txn, Build::ptr build, BuildStatus status); @@ -536,7 +536,7 @@ private: void processQueueChange(Connection & conn); BuildOutput getBuildOutputCached(Connection & conn, nix::ref destStore, - const nix::Derivation & drv); + const nix::StorePath & drvPath); Step::ptr createStep(nix::ref store, Connection & conn, Build::ptr build, const nix::StorePath & drvPath,