Bring back constituents to Hydra

The part about finding `_hydraAggregate`/`constituents` is basically
derived from `hydra-eval-jobs`, however the part about
`namedConstituents` has been changed: we still stream out jobs when they
appear, however we suppress this for aggregate jobs.

These jobs are post-processed at the end, i.e. if `namedConstituents`
exist, these will be mapped to the drvPath of the other jobs. Then, the
drv will be rewritten to contain the drvPath of said jobs[1] and the
JSON containing the rewritten `drvPath` will be printed out.

[1] This was an optimization to reduce the memory footprint of
    evaluating e.g. the `tested` job in nixpkgs.
This commit is contained in:
Maximilian Bosch 2024-10-13 19:13:22 +02:00
parent df3edf3730
commit ac27a3f9d5
Signed by: ma27
SSH key fingerprint: SHA256:d7dmwHmpai66L6KIXA+wxzVbkPq0nGLrcHK3ZNroqZY
8 changed files with 227 additions and 5 deletions

View file

@ -43,7 +43,7 @@ queryIsCached(nix::Store &store,
/* The fields of a derivation that are printed in json form */
Drv::Drv(std::string &attrPath, nix::EvalState &state, nix::DrvInfo &drvInfo,
MyArgs &args) {
MyArgs &args, std::optional<Constituents> constituents) : constituents(constituents) {
auto localStore = state.ctx.store.dynamic_pointer_cast<nix::LocalFSStore>();
@ -128,6 +128,11 @@ void to_json(nlohmann::json &json, const Drv &drv) {
json["meta"] = drv.meta.value();
}
if (auto constituents = drv.constituents) {
json["constituents"] = constituents->constituents;
json["namedConstituents"] = constituents->namedConstituents;
}
if (drv.cacheStatus != Drv::CacheStatus::Unknown) {
json["isCached"] = drv.cacheStatus == Drv::CacheStatus::Cached;
}

View file

@ -17,6 +17,13 @@ class EvalState;
struct DrvInfo;
} // namespace nix
struct Constituents
{
std::vector<std::string> constituents;
std::vector<std::string> namedConstituents;
Constituents(std::vector<std::string> constituents, std::vector<std::string> namedConstituents) : constituents(constituents), namedConstituents(namedConstituents) { };
};
/* The fields of a derivation that are printed in json form */
struct Drv {
std::string name;
@ -27,7 +34,8 @@ struct Drv {
std::map<std::string, std::optional<std::string>> outputs;
std::map<std::string, std::set<std::string>> inputDrvs;
std::optional<nlohmann::json> meta;
std::optional<Constituents> constituents;
Drv(std::string &attrPath, nix::EvalState &state, nix::DrvInfo &drvInfo, MyArgs &args);
Drv(std::string &attrPath, nix::EvalState &state, nix::DrvInfo &drvInfo, MyArgs &args, std::optional<Constituents> constituents);
};
void to_json(nlohmann::json &json, const Drv &drv);

View file

@ -62,6 +62,10 @@ MyArgs::MyArgs() : MixCommonArgs("nix-eval-jobs") {
.description = "include derivation meta field in output",
.handler = {&meta, true}});
addFlag({.longName = "constituents",
.description = "whether to evaluate constituents for Hydra's aggregate feature",
.handler = {&constituents, true}});
addFlag({.longName = "check-cache-status",
.description =
"Check if the derivations are present locally or in "

View file

@ -23,6 +23,7 @@ class MyArgs : virtual public nix::MixEvalArgs,
bool impure = false;
bool forceRecurse = false;
bool checkCacheStatus = false;
bool constituents = false;
size_t nrWorkers = 1;
size_t maxMemorySize = 4096;

View file

@ -1,5 +1,7 @@
#include <lix/config.h> // IWYU pragma: keep
#include <lix/libstore/derivations.hh>
#include <lix/libstore/local-fs-store.hh>
#include <lix/libexpr/eval-settings.hh>
#include <lix/libmain/shared.hh>
#include <lix/libutil/sync.hh>
@ -148,6 +150,7 @@ private:
struct State {
std::set<json> todo = json::array({json::array()});
std::set<json> active;
std::map<std::string, json> jobs;
std::exception_ptr exc;
};
@ -308,7 +311,11 @@ void collector(Sync<State> &state_, std::condition_variable &wakeup) {
}
} else {
auto state(state_.lock());
std::cout << respString << "\n" << std::flush;
state->jobs.insert_or_assign(response["attr"], response);
auto named = response.find("namedConstituents");
if (named == response.end() || named->empty()) {
std::cout << respString << "\n" << std::flush;
}
}
proc_ = std::move(proc);
@ -383,5 +390,81 @@ int main(int argc, char **argv) {
if (state->exc)
std::rethrow_exception(state->exc);
if (myArgs.constituents) {
auto store = myArgs.evalStoreUrl
? openStore(*myArgs.evalStoreUrl)
: openStore();
for (auto & [attr, job_json] : state->jobs) {
auto namedConstituents = job_json.find("namedConstituents");
if (namedConstituents != job_json.end() && !namedConstituents->empty()) {
bool broken = false;
auto drvPathAggregate = store->parseStorePath((std::string) job_json["drvPath"]);
auto drvAggregate = store->readDerivation(drvPathAggregate);
if (!job_json.contains("constituents")) {
job_json["constituents"] = json::array();
}
std::vector<std::string> errors;
for (auto child : *namedConstituents) {
auto childJob = state->jobs.find(child);
if (childJob == state->jobs.end()) {
broken = true;
errors.push_back(fmt("%s: does not exist", child));
} else if (childJob->second.find("error") != childJob->second.end()) {
broken = true;
errors.push_back(fmt("%s: %s", child, childJob->second["error"]));
} else {
auto drvPathChild = store->parseStorePath((std::string) childJob->second["drvPath"]);
auto drvChild = store->readDerivation(drvPathChild);
job_json["constituents"].push_back(store->printStorePath(drvPathChild));
drvAggregate.inputDrvs.map[drvPathChild].value = {drvChild.outputs.begin()->first};
}
}
if (broken) {
json out;
out["attr"] = job_json["attr"];
out["error"] = concatStringsSep("\n", errors);
out["constituents"] = json::array();
std::cout << out.dump() << "\n" << std::flush;
} else {
std::string drvName(drvPathAggregate.name());
assert(drvName.ends_with(nix::drvExtension));
drvName.resize(drvName.size() - nix::drvExtension.size());
auto hashModulo = nix::hashDerivationModulo(*store, drvAggregate, true);
if (hashModulo.kind != nix::DrvHash::Kind::Regular) continue;
auto h = hashModulo.hashes.find("out");
if (h == hashModulo.hashes.end()) continue;
auto outPath = store->makeOutputPath("out", h->second, drvName);
drvAggregate.env["out"] = store->printStorePath(outPath);
drvAggregate.outputs.insert_or_assign("out", nix::DerivationOutput::InputAddressed { .path = outPath });
auto newDrvPath = store->printStorePath(nix::writeDerivation(*store, drvAggregate));
if (myArgs.gcRootsDir != "") {
nix::Path root =
myArgs.gcRootsDir + "/" +
std::string(nix::baseNameOf(newDrvPath));
if (!nix::pathExists(root)) {
auto localStore =
store
.dynamic_pointer_cast<nix::LocalFSStore>();
auto storePath =
localStore->parseStorePath(newDrvPath);
localStore->addPermRoot(storePath, root);
}
}
debug("rewrote aggregate derivation %s -> %s", store->printStorePath(drvPathAggregate), newDrvPath);
job_json["drvPath"] = newDrvPath;
job_json["outputs"]["out"] = store->printStorePath(outPath);
job_json.erase("namedConstituents");
std::cout << job_json.dump() << "\n" << std::flush;
}
}
}
}
});
}

View file

@ -127,7 +127,40 @@ void worker(nix::ref<nix::eval_cache::CachingEvaluator> evaluator, nix::Bindings
if (v->type() == nix::nAttrs) {
if (auto drvInfo = nix::getDerivation(*state, *v, false)) {
auto drv = Drv(attrPathS, *state, *drvInfo, args);
std::optional<Constituents> maybeConstituents;
if (args.constituents) {
std::vector<std::string> constituents;
std::vector<std::string> namedConstituents;
auto a = v->attrs->get(evaluator->symbols.create("_hydraAggregate"));
if (a && state->forceBool(*a->value, a->pos, "while evaluating the `_hydraAggregate` attribute")) {
auto a = v->attrs->get(evaluator->symbols.create("constituents"));
if (!a)
state->ctx.errors.make<nix::EvalError>("derivation must have a constituents attribute").debugThrow();
nix::NixStringContext context;
state->coerceToString(a->pos, *a->value, context, "while evaluating the `constituents` attribute", true, false);
for (auto & c : context)
std::visit(nix::overloaded {
[&](const nix::NixStringContextElem::Built & b) {
constituents.push_back(b.drvPath->to_string(*evaluator->store));
},
[&](const nix::NixStringContextElem::Opaque & o) {
},
[&](const nix::NixStringContextElem::DrvDeep & d) {
},
}, c.raw);
state->forceList(*a->value, a->pos, "while evaluating the `constituents` attribute");
for (unsigned int n = 0; n < a->value->listSize(); ++n) {
auto v = a->value->listElems()[n];
state->forceValue(*v, nix::noPos);
if (v->type() == nix::nString)
namedConstituents.push_back(std::string(v->str()));
}
}
maybeConstituents = Constituents(constituents, namedConstituents);
}
auto drv = Drv(attrPathS, *state, *drvInfo, args, maybeConstituents);
reply.update(drv);
/* Register the derivation as a GC root. !!! This

View file

@ -1,7 +1,7 @@
{
inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixpkgs-unstable";
outputs = { nixpkgs, ... }:
outputs = { self, nixpkgs, ... }:
let
pkgs = nixpkgs.legacyPackages.x86_64-linux;
in
@ -24,6 +24,30 @@
builder = ":";
};
};
success = {
aggregate = pkgs.runCommand "aggregate" {
_hydraAggregate = true;
constituents = [
self.hydraJobs.builtJob
"anotherone"
];
} ''
touch $out
'';
anotherone = pkgs.writeText "constituent" "text";
};
failures = {
aggregate = pkgs.runCommand "aggregate" {
_hydraAggregate = true;
constituents = [
"doesntexist"
"doesnteval"
];
} ''
touch $out
'';
doesnteval = pkgs.writeText "constituent" (toString {});
};
};
};
}

View file

@ -95,6 +95,70 @@ def test_eval_error() -> None:
assert "this is an evaluation error" in attrs["error"]
def test_constituents() -> None:
with TemporaryDirectory() as tempdir:
cmd = [
str(BIN),
"--gc-roots-dir",
tempdir,
"--meta",
"--workers",
"1",
"--flake",
".#legacyPackages.x86_64-linux.success",
"--constituents",
]
res = subprocess.run(
cmd,
cwd=TEST_ROOT.joinpath("assets"),
text=True,
stdout=subprocess.PIPE,
)
print(res.stdout)
results = [json.loads(r) for r in res.stdout.split("\n") if r]
assert len(results) == 2
child = results[0]
assert child["attr"] == "anotherone"
aggregate = results[1]
assert aggregate["attr"] == "aggregate"
assert "namedConstituents" not in aggregate
assert aggregate["constituents"][0].endswith("-job1.drv")
assert aggregate["constituents"][1] == child["drvPath"]
assert "error" not in aggregate
def test_constituents_error() -> None:
with TemporaryDirectory() as tempdir:
cmd = [
str(BIN),
"--gc-roots-dir",
tempdir,
"--meta",
"--workers",
"1",
"--flake",
".#legacyPackages.x86_64-linux.failures",
"--constituents",
]
res = subprocess.run(
cmd,
cwd=TEST_ROOT.joinpath("assets"),
text=True,
stdout=subprocess.PIPE,
)
print(res.stdout)
results = [json.loads(r) for r in res.stdout.split("\n") if r]
assert len(results) == 2
child = results[0]
assert child["attr"] == "doesnteval"
assert "error" in child
aggregate = results[1]
assert aggregate["attr"] == "aggregate"
assert "namedConstituents" not in aggregate
assert aggregate["error"].startswith('"doesntexist": does not exist\n"doesnteval": "error: derivation ')
assert aggregate["constituents"] == []
@pytest.mark.infiniterecursion
def test_recursion_error() -> None:
with TemporaryDirectory() as tempdir: