From 5ce92e2c4d318b30edaafa4d8664b9e15e941f32 Mon Sep 17 00:00:00 2001 From: Maximilian Bosch Date: Sun, 13 Oct 2024 19:13:22 +0200 Subject: [PATCH] Bring back constituents to Hydra The part about finding `_hydraAggregate`/`constituents` is basically derived from `hydra-eval-jobs`, however the part about `namedConstituents` has been changed: we still stream out jobs when they appear, however we suppress this for aggregate jobs. These jobs are post-processed at the end, i.e. if `namedConstituents` exist, these will be mapped to the drvPath of the other jobs. Then, the drv will be rewritten to contain the drvPath of said jobs[1] and the JSON containing the rewritten `drvPath` will be printed out. [1] This was an optimization to reduce the memory footprint of evaluating e.g. the `tested` job in nixpkgs. --- src/drv.cc | 7 +++- src/drv.hh | 10 ++++- src/eval-args.cc | 4 ++ src/eval-args.hh | 1 + src/nix-eval-jobs.cc | 85 +++++++++++++++++++++++++++++++++++++++++- src/worker.cc | 35 ++++++++++++++++- tests/assets/flake.nix | 26 ++++++++++++- tests/test_eval.py | 64 +++++++++++++++++++++++++++++++ 8 files changed, 227 insertions(+), 5 deletions(-) diff --git a/src/drv.cc b/src/drv.cc index dca58de..7e490c9 100644 --- a/src/drv.cc +++ b/src/drv.cc @@ -43,7 +43,7 @@ queryIsCached(nix::Store &store, /* The fields of a derivation that are printed in json form */ Drv::Drv(std::string &attrPath, nix::EvalState &state, nix::DrvInfo &drvInfo, - MyArgs &args) { + MyArgs &args, std::optional constituents) : constituents(constituents) { auto localStore = state.store.dynamic_pointer_cast(); @@ -127,6 +127,11 @@ void to_json(nlohmann::json &json, const Drv &drv) { json["meta"] = drv.meta.value(); } + if (auto constituents = drv.constituents) { + json["constituents"] = constituents->constituents; + json["namedConstituents"] = constituents->namedConstituents; + } + if (drv.cacheStatus != Drv::CacheStatus::Unknown) { json["isCached"] = drv.cacheStatus == Drv::CacheStatus::Cached; } diff --git a/src/drv.hh b/src/drv.hh index 4cfc6a0..41b1233 100644 --- a/src/drv.hh +++ b/src/drv.hh @@ -17,6 +17,13 @@ class EvalState; struct DrvInfo; } // namespace nix +struct Constituents +{ + std::vector constituents; + std::vector namedConstituents; + Constituents(std::vector constituents, std::vector namedConstituents) : constituents(constituents), namedConstituents(namedConstituents) { }; +}; + /* The fields of a derivation that are printed in json form */ struct Drv { std::string name; @@ -27,7 +34,8 @@ struct Drv { std::map> outputs; std::map> inputDrvs; std::optional meta; + std::optional constituents; - Drv(std::string &attrPath, nix::EvalState &state, nix::DrvInfo &drvInfo, MyArgs &args); + Drv(std::string &attrPath, nix::EvalState &state, nix::DrvInfo &drvInfo, MyArgs &args, std::optional constituents); }; void to_json(nlohmann::json &json, const Drv &drv); diff --git a/src/eval-args.cc b/src/eval-args.cc index b127638..66d2ea9 100644 --- a/src/eval-args.cc +++ b/src/eval-args.cc @@ -62,6 +62,10 @@ MyArgs::MyArgs() : MixCommonArgs("nix-eval-jobs") { .description = "include derivation meta field in output", .handler = {&meta, true}}); + addFlag({.longName = "constituents", + .description = "whether to evaluate constituents for Hydra's aggregate feature", + .handler = {&constituents, true}}); + addFlag({.longName = "check-cache-status", .description = "Check if the derivations are present locally or in " diff --git a/src/eval-args.hh b/src/eval-args.hh index b0932fb..7be13da 100644 --- a/src/eval-args.hh +++ b/src/eval-args.hh @@ -23,6 +23,7 @@ class MyArgs : virtual public nix::MixEvalArgs, bool impure = false; bool forceRecurse = false; bool checkCacheStatus = false; + bool constituents = false; size_t nrWorkers = 1; size_t maxMemorySize = 4096; diff --git a/src/nix-eval-jobs.cc b/src/nix-eval-jobs.cc index ea45a41..2e3c04e 100644 --- a/src/nix-eval-jobs.cc +++ b/src/nix-eval-jobs.cc @@ -1,5 +1,7 @@ #include // IWYU pragma: keep +#include +#include #include #include #include @@ -150,6 +152,7 @@ private: struct State { std::set todo = json::array({json::array()}); std::set active; + std::map jobs; std::exception_ptr exc; }; @@ -310,7 +313,11 @@ void collector(Sync &state_, std::condition_variable &wakeup) { } } else { auto state(state_.lock()); - std::cout << respString << "\n" << std::flush; + state->jobs.insert_or_assign(response["attr"], response); + auto named = response.find("namedConstituents"); + if (named == response.end() || named->empty()) { + std::cout << respString << "\n" << std::flush; + } } proc_ = std::move(proc); @@ -393,5 +400,81 @@ int main(int argc, char **argv) { if (state->exc) std::rethrow_exception(state->exc); + + if (myArgs.constituents) { + auto store = myArgs.evalStoreUrl + ? openStore(*myArgs.evalStoreUrl) + : openStore(); + for (auto & [attr, job_json] : state->jobs) { + auto namedConstituents = job_json.find("namedConstituents"); + if (namedConstituents != job_json.end() && !namedConstituents->empty()) { + bool broken = false; + auto drvPathAggregate = store->parseStorePath((std::string) job_json["drvPath"]); + auto drvAggregate = store->readDerivation(drvPathAggregate); + if (!job_json.contains("constituents")) { + job_json["constituents"] = json::array(); + } + std::vector errors; + for (auto child : *namedConstituents) { + auto childJob = state->jobs.find(child); + if (childJob == state->jobs.end()) { + broken = true; + errors.push_back(fmt("%s: does not exist", child)); + } else if (childJob->second.find("error") != childJob->second.end()) { + broken = true; + errors.push_back(fmt("%s: %s", child, childJob->second["error"])); + } else { + auto drvPathChild = store->parseStorePath((std::string) childJob->second["drvPath"]); + auto drvChild = store->readDerivation(drvPathChild); + job_json["constituents"].push_back(store->printStorePath(drvPathChild)); + drvAggregate.inputDrvs.map[drvPathChild].value = {drvChild.outputs.begin()->first}; + } + } + + if (broken) { + json out; + out["attr"] = job_json["attr"]; + out["error"] = concatStringsSep("\n", errors); + out["constituents"] = json::array(); + std::cout << out.dump() << "\n" << std::flush; + } else { + std::string drvName(drvPathAggregate.name()); + assert(drvName.ends_with(nix::drvExtension)); + drvName.resize(drvName.size() - nix::drvExtension.size()); + + auto hashModulo = nix::hashDerivationModulo(*store, drvAggregate, true); + if (hashModulo.kind != nix::DrvHash::Kind::Regular) continue; + + auto h = hashModulo.hashes.find("out"); + if (h == hashModulo.hashes.end()) continue; + auto outPath = store->makeOutputPath("out", h->second, drvName); + drvAggregate.env["out"] = store->printStorePath(outPath); + drvAggregate.outputs.insert_or_assign("out", nix::DerivationOutput::InputAddressed { .path = outPath }); + auto newDrvPath = store->printStorePath(nix::writeDerivation(*store, drvAggregate)); + + if (myArgs.gcRootsDir != "") { + nix::Path root = + myArgs.gcRootsDir + "/" + + std::string(nix::baseNameOf(newDrvPath)); + if (!nix::pathExists(root)) { + auto localStore = + store + .dynamic_pointer_cast(); + auto storePath = + localStore->parseStorePath(newDrvPath); + localStore->addPermRoot(storePath, root); + } + } + + debug("rewrote aggregate derivation %s -> %s", store->printStorePath(drvPathAggregate), newDrvPath); + + job_json["drvPath"] = newDrvPath; + job_json["outputs"]["out"] = store->printStorePath(outPath); + job_json.erase("namedConstituents"); + std::cout << job_json.dump() << "\n" << std::flush; + } + } + } + } }); } diff --git a/src/worker.cc b/src/worker.cc index 5e5ff9f..f429e1a 100644 --- a/src/worker.cc +++ b/src/worker.cc @@ -124,7 +124,40 @@ void worker(nix::ref state, nix::Bindings &autoArgs, if (v->type() == nix::nAttrs) { if (auto drvInfo = nix::getDerivation(*state, *v, false)) { - auto drv = Drv(attrPathS, *state, *drvInfo, args); + std::optional maybeConstituents; + if (args.constituents) { + std::vector constituents; + std::vector namedConstituents; + auto a = v->attrs->get(state->symbols.create("_hydraAggregate")); + if (a && state->forceBool(*a->value, a->pos, "while evaluating the `_hydraAggregate` attribute")) { + auto a = v->attrs->get(state->symbols.create("constituents")); + if (!a) + state->error("derivation must have a ‘constituents’ attribute").debugThrow(); + + nix::NixStringContext context; + state->coerceToString(a->pos, *a->value, context, "while evaluating the `constituents` attribute", true, false); + for (auto & c : context) + std::visit(nix::overloaded { + [&](const nix::NixStringContextElem::Built & b) { + constituents.push_back(b.drvPath->to_string(*state->store)); + }, + [&](const nix::NixStringContextElem::Opaque & o) { + }, + [&](const nix::NixStringContextElem::DrvDeep & d) { + }, + }, c.raw); + + state->forceList(*a->value, a->pos, "while evaluating the `constituents` attribute"); + for (unsigned int n = 0; n < a->value->listSize(); ++n) { + auto v = a->value->listElems()[n]; + state->forceValue(*v, nix::noPos); + if (v->type() == nix::nString) + namedConstituents.push_back(std::string(v->str())); + } + } + maybeConstituents = Constituents(constituents, namedConstituents); + } + auto drv = Drv(attrPathS, *state, *drvInfo, args, maybeConstituents); reply.update(drv); /* Register the derivation as a GC root. !!! This diff --git a/tests/assets/flake.nix b/tests/assets/flake.nix index 5a32277..2840fbd 100644 --- a/tests/assets/flake.nix +++ b/tests/assets/flake.nix @@ -1,7 +1,7 @@ { inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixpkgs-unstable"; - outputs = { nixpkgs, ... }: + outputs = { self, nixpkgs, ... }: let pkgs = nixpkgs.legacyPackages.x86_64-linux; in @@ -24,6 +24,30 @@ builder = ":"; }; }; + success = { + aggregate = pkgs.runCommand "aggregate" { + _hydraAggregate = true; + constituents = [ + self.hydraJobs.builtJob + "anotherone" + ]; + } '' + touch $out + ''; + anotherone = pkgs.writeText "constituent" "text"; + }; + failures = { + aggregate = pkgs.runCommand "aggregate" { + _hydraAggregate = true; + constituents = [ + "doesntexist" + "doesnteval" + ]; + } '' + touch $out + ''; + doesnteval = pkgs.writeText "constituent" (toString {}); + }; }; }; } diff --git a/tests/test_eval.py b/tests/test_eval.py index 8fa41cb..c56834a 100644 --- a/tests/test_eval.py +++ b/tests/test_eval.py @@ -95,6 +95,70 @@ def test_eval_error() -> None: assert "this is an evaluation error" in attrs["error"] +def test_constituents() -> None: + with TemporaryDirectory() as tempdir: + cmd = [ + str(BIN), + "--gc-roots-dir", + tempdir, + "--meta", + "--workers", + "1", + "--flake", + ".#legacyPackages.x86_64-linux.success", + "--constituents", + ] + res = subprocess.run( + cmd, + cwd=TEST_ROOT.joinpath("assets"), + text=True, + stdout=subprocess.PIPE, + ) + print(res.stdout) + results = [json.loads(r) for r in res.stdout.split("\n") if r] + assert len(results) == 2 + child = results[0] + assert child["attr"] == "anotherone" + aggregate = results[1] + assert aggregate["attr"] == "aggregate" + assert "namedConstituents" not in aggregate + assert aggregate["constituents"][0].endswith("-job1.drv") + assert aggregate["constituents"][1] == child["drvPath"] + assert "error" not in aggregate + + +def test_constituents_error() -> None: + with TemporaryDirectory() as tempdir: + cmd = [ + str(BIN), + "--gc-roots-dir", + tempdir, + "--meta", + "--workers", + "1", + "--flake", + ".#legacyPackages.x86_64-linux.failures", + "--constituents", + ] + res = subprocess.run( + cmd, + cwd=TEST_ROOT.joinpath("assets"), + text=True, + stdout=subprocess.PIPE, + ) + print(res.stdout) + results = [json.loads(r) for r in res.stdout.split("\n") if r] + assert len(results) == 2 + child = results[0] + assert child["attr"] == "doesnteval" + assert "error" in child + aggregate = results[1] + assert aggregate["attr"] == "aggregate" + assert "namedConstituents" not in aggregate + assert aggregate["error"].startswith('"doesntexist": does not exist\n"doesnteval": "error: derivation ') + assert aggregate["constituents"] == [] + + @pytest.mark.infiniterecursion def test_recursion_error() -> None: with TemporaryDirectory() as tempdir: