Bring back constituents to Hydra
The part about finding `_hydraAggregate`/`constituents` is basically derived from `hydra-eval-jobs`, however the part about `namedConstituents` has been changed: we still stream out jobs when they appear, however we suppress this for aggregate jobs. These jobs are post-processed at the end, i.e. if `namedConstituents` exist, these will be mapped to the drvPath of the other jobs. Then, the drv will be rewritten to contain the drvPath of said jobs[1] and the JSON containing the rewritten `drvPath` will be printed out. [1] This was an optimization to reduce the memory footprint of evaluating e.g. the `tested` job in nixpkgs.
This commit is contained in:
parent
dfc286ca3d
commit
76f32ed29e
|
@ -43,7 +43,7 @@ queryIsCached(nix::Store &store,
|
|||
|
||||
/* The fields of a derivation that are printed in json form */
|
||||
Drv::Drv(std::string &attrPath, nix::EvalState &state, nix::DrvInfo &drvInfo,
|
||||
MyArgs &args) {
|
||||
MyArgs &args, std::optional<Constituents> constituents) : constituents(constituents) {
|
||||
|
||||
auto localStore = state.store.dynamic_pointer_cast<nix::LocalFSStore>();
|
||||
|
||||
|
@ -127,6 +127,11 @@ void to_json(nlohmann::json &json, const Drv &drv) {
|
|||
json["meta"] = drv.meta.value();
|
||||
}
|
||||
|
||||
if (auto constituents = drv.constituents) {
|
||||
json["constituents"] = constituents->constituents;
|
||||
json["namedConstituents"] = constituents->namedConstituents;
|
||||
}
|
||||
|
||||
if (drv.cacheStatus != Drv::CacheStatus::Unknown) {
|
||||
json["isCached"] = drv.cacheStatus == Drv::CacheStatus::Cached;
|
||||
}
|
||||
|
|
10
src/drv.hh
10
src/drv.hh
|
@ -17,6 +17,13 @@ class EvalState;
|
|||
struct DrvInfo;
|
||||
} // namespace nix
|
||||
|
||||
struct Constituents
|
||||
{
|
||||
std::vector<std::string> constituents;
|
||||
std::vector<std::string> namedConstituents;
|
||||
Constituents(std::vector<std::string> constituents, std::vector<std::string> namedConstituents) : constituents(constituents), namedConstituents(namedConstituents) { };
|
||||
};
|
||||
|
||||
/* The fields of a derivation that are printed in json form */
|
||||
struct Drv {
|
||||
std::string name;
|
||||
|
@ -27,7 +34,8 @@ struct Drv {
|
|||
std::map<std::string, std::optional<std::string>> outputs;
|
||||
std::map<std::string, std::set<std::string>> inputDrvs;
|
||||
std::optional<nlohmann::json> meta;
|
||||
std::optional<Constituents> constituents;
|
||||
|
||||
Drv(std::string &attrPath, nix::EvalState &state, nix::DrvInfo &drvInfo, MyArgs &args);
|
||||
Drv(std::string &attrPath, nix::EvalState &state, nix::DrvInfo &drvInfo, MyArgs &args, std::optional<Constituents> constituents);
|
||||
};
|
||||
void to_json(nlohmann::json &json, const Drv &drv);
|
||||
|
|
|
@ -62,6 +62,10 @@ MyArgs::MyArgs() : MixCommonArgs("nix-eval-jobs") {
|
|||
.description = "include derivation meta field in output",
|
||||
.handler = {&meta, true}});
|
||||
|
||||
addFlag({.longName = "constituents",
|
||||
.description = "whether to evaluate constituents for Hydra's aggregate feature",
|
||||
.handler = {&constituents, true}});
|
||||
|
||||
addFlag({.longName = "check-cache-status",
|
||||
.description =
|
||||
"Check if the derivations are present locally or in "
|
||||
|
|
|
@ -23,6 +23,7 @@ class MyArgs : virtual public nix::MixEvalArgs,
|
|||
bool impure = false;
|
||||
bool forceRecurse = false;
|
||||
bool checkCacheStatus = false;
|
||||
bool constituents = false;
|
||||
size_t nrWorkers = 1;
|
||||
size_t maxMemorySize = 4096;
|
||||
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
#include <lix/config.h> // IWYU pragma: keep
|
||||
|
||||
#include <lix/libstore/derivations.hh>
|
||||
#include <lix/libstore/local-fs-store.hh>
|
||||
#include <lix/libexpr/eval-settings.hh>
|
||||
#include <lix/libmain/shared.hh>
|
||||
#include <lix/libutil/sync.hh>
|
||||
|
@ -150,6 +152,7 @@ private:
|
|||
struct State {
|
||||
std::set<json> todo = json::array({json::array()});
|
||||
std::set<json> active;
|
||||
std::map<std::string, json> jobs;
|
||||
std::exception_ptr exc;
|
||||
};
|
||||
|
||||
|
@ -310,8 +313,12 @@ void collector(Sync<State> &state_, std::condition_variable &wakeup) {
|
|||
}
|
||||
} else {
|
||||
auto state(state_.lock());
|
||||
state->jobs.insert_or_assign(response["attr"], response);
|
||||
auto named = response.find("namedConstituents");
|
||||
if (named == response.end() || named->empty()) {
|
||||
std::cout << respString << "\n" << std::flush;
|
||||
}
|
||||
}
|
||||
|
||||
proc_ = std::move(proc);
|
||||
fromReader_ = std::move(fromReader);
|
||||
|
@ -385,5 +392,81 @@ int main(int argc, char **argv) {
|
|||
|
||||
if (state->exc)
|
||||
std::rethrow_exception(state->exc);
|
||||
|
||||
if (myArgs.constituents) {
|
||||
auto store = myArgs.evalStoreUrl
|
||||
? openStore(*myArgs.evalStoreUrl)
|
||||
: openStore();
|
||||
for (auto & [attr, job_json] : state->jobs) {
|
||||
auto namedConstituents = job_json.find("namedConstituents");
|
||||
if (namedConstituents != job_json.end() && !namedConstituents->empty()) {
|
||||
bool broken = false;
|
||||
auto drvPathAggregate = store->parseStorePath((std::string) job_json["drvPath"]);
|
||||
auto drvAggregate = store->readDerivation(drvPathAggregate);
|
||||
if (!job_json.contains("constituents")) {
|
||||
job_json["constituents"] = json::array();
|
||||
}
|
||||
std::vector<std::string> errors;
|
||||
for (auto child : *namedConstituents) {
|
||||
auto childJob = state->jobs.find(child);
|
||||
if (childJob == state->jobs.end()) {
|
||||
broken = true;
|
||||
errors.push_back(fmt("%s: does not exist", child));
|
||||
} else if (childJob->second.find("error") != childJob->second.end()) {
|
||||
broken = true;
|
||||
errors.push_back(fmt("%s: %s", child, childJob->second["error"]));
|
||||
} else {
|
||||
auto drvPathChild = store->parseStorePath((std::string) childJob->second["drvPath"]);
|
||||
auto drvChild = store->readDerivation(drvPathChild);
|
||||
job_json["constituents"].push_back(store->printStorePath(drvPathChild));
|
||||
drvAggregate.inputDrvs.map[drvPathChild].value = {drvChild.outputs.begin()->first};
|
||||
}
|
||||
}
|
||||
|
||||
if (broken) {
|
||||
json out;
|
||||
out["attr"] = job_json["attr"];
|
||||
out["error"] = concatStringsSep("\n", errors);
|
||||
out["constituents"] = json::array();
|
||||
std::cout << out.dump() << "\n" << std::flush;
|
||||
} else {
|
||||
std::string drvName(drvPathAggregate.name());
|
||||
assert(drvName.ends_with(nix::drvExtension));
|
||||
drvName.resize(drvName.size() - nix::drvExtension.size());
|
||||
|
||||
auto hashModulo = nix::hashDerivationModulo(*store, drvAggregate, true);
|
||||
if (hashModulo.kind != nix::DrvHash::Kind::Regular) continue;
|
||||
|
||||
auto h = hashModulo.hashes.find("out");
|
||||
if (h == hashModulo.hashes.end()) continue;
|
||||
auto outPath = store->makeOutputPath("out", h->second, drvName);
|
||||
drvAggregate.env["out"] = store->printStorePath(outPath);
|
||||
drvAggregate.outputs.insert_or_assign("out", nix::DerivationOutput::InputAddressed { .path = outPath });
|
||||
auto newDrvPath = store->printStorePath(nix::writeDerivation(*store, drvAggregate));
|
||||
|
||||
if (myArgs.gcRootsDir != "") {
|
||||
nix::Path root =
|
||||
myArgs.gcRootsDir + "/" +
|
||||
std::string(nix::baseNameOf(newDrvPath));
|
||||
if (!nix::pathExists(root)) {
|
||||
auto localStore =
|
||||
store
|
||||
.dynamic_pointer_cast<nix::LocalFSStore>();
|
||||
auto storePath =
|
||||
localStore->parseStorePath(newDrvPath);
|
||||
localStore->addPermRoot(storePath, root);
|
||||
}
|
||||
}
|
||||
|
||||
debug("rewrote aggregate derivation %s -> %s", store->printStorePath(drvPathAggregate), newDrvPath);
|
||||
|
||||
job_json["drvPath"] = newDrvPath;
|
||||
job_json["outputs"]["out"] = store->printStorePath(outPath);
|
||||
job_json.erase("namedConstituents");
|
||||
std::cout << job_json.dump() << "\n" << std::flush;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -124,7 +124,40 @@ void worker(nix::ref<nix::EvalState> state, nix::Bindings &autoArgs,
|
|||
|
||||
if (v->type() == nix::nAttrs) {
|
||||
if (auto drvInfo = nix::getDerivation(*state, *v, false)) {
|
||||
auto drv = Drv(attrPathS, *state, *drvInfo, args);
|
||||
std::optional<Constituents> maybeConstituents;
|
||||
if (args.constituents) {
|
||||
std::vector<std::string> constituents;
|
||||
std::vector<std::string> namedConstituents;
|
||||
auto a = v->attrs->get(state->symbols.create("_hydraAggregate"));
|
||||
if (a && state->forceBool(*a->value, a->pos, "while evaluating the `_hydraAggregate` attribute")) {
|
||||
auto a = v->attrs->get(state->symbols.create("constituents"));
|
||||
if (!a)
|
||||
state->error<nix::EvalError>("derivation must have a ‘constituents’ attribute").debugThrow();
|
||||
|
||||
nix::NixStringContext context;
|
||||
state->coerceToString(a->pos, *a->value, context, "while evaluating the `constituents` attribute", true, false);
|
||||
for (auto & c : context)
|
||||
std::visit(nix::overloaded {
|
||||
[&](const nix::NixStringContextElem::Built & b) {
|
||||
constituents.push_back(b.drvPath->to_string(*state->store));
|
||||
},
|
||||
[&](const nix::NixStringContextElem::Opaque & o) {
|
||||
},
|
||||
[&](const nix::NixStringContextElem::DrvDeep & d) {
|
||||
},
|
||||
}, c.raw);
|
||||
|
||||
state->forceList(*a->value, a->pos, "while evaluating the `constituents` attribute");
|
||||
for (unsigned int n = 0; n < a->value->listSize(); ++n) {
|
||||
auto v = a->value->listElems()[n];
|
||||
state->forceValue(*v, nix::noPos);
|
||||
if (v->type() == nix::nString)
|
||||
namedConstituents.push_back(std::string(v->str()));
|
||||
}
|
||||
}
|
||||
maybeConstituents = Constituents(constituents, namedConstituents);
|
||||
}
|
||||
auto drv = Drv(attrPathS, *state, *drvInfo, args, maybeConstituents);
|
||||
reply.update(drv);
|
||||
|
||||
/* Register the derivation as a GC root. !!! This
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
{
|
||||
inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixpkgs-unstable";
|
||||
|
||||
outputs = { nixpkgs, ... }:
|
||||
outputs = { self, nixpkgs, ... }:
|
||||
let
|
||||
pkgs = nixpkgs.legacyPackages.x86_64-linux;
|
||||
in
|
||||
|
@ -24,6 +24,30 @@
|
|||
builder = ":";
|
||||
};
|
||||
};
|
||||
success = {
|
||||
aggregate = pkgs.runCommand "aggregate" {
|
||||
_hydraAggregate = true;
|
||||
constituents = [
|
||||
self.hydraJobs.builtJob
|
||||
"anotherone"
|
||||
];
|
||||
} ''
|
||||
touch $out
|
||||
'';
|
||||
anotherone = pkgs.writeText "constituent" "text";
|
||||
};
|
||||
failures = {
|
||||
aggregate = pkgs.runCommand "aggregate" {
|
||||
_hydraAggregate = true;
|
||||
constituents = [
|
||||
"doesntexist"
|
||||
"doesnteval"
|
||||
];
|
||||
} ''
|
||||
touch $out
|
||||
'';
|
||||
doesnteval = pkgs.writeText "constituent" (toString {});
|
||||
};
|
||||
};
|
||||
};
|
||||
}
|
||||
|
|
|
@ -95,6 +95,70 @@ def test_eval_error() -> None:
|
|||
assert "this is an evaluation error" in attrs["error"]
|
||||
|
||||
|
||||
def test_constituents() -> None:
|
||||
with TemporaryDirectory() as tempdir:
|
||||
cmd = [
|
||||
str(BIN),
|
||||
"--gc-roots-dir",
|
||||
tempdir,
|
||||
"--meta",
|
||||
"--workers",
|
||||
"1",
|
||||
"--flake",
|
||||
".#legacyPackages.x86_64-linux.success",
|
||||
"--constituents",
|
||||
]
|
||||
res = subprocess.run(
|
||||
cmd,
|
||||
cwd=TEST_ROOT.joinpath("assets"),
|
||||
text=True,
|
||||
stdout=subprocess.PIPE,
|
||||
)
|
||||
print(res.stdout)
|
||||
results = [json.loads(r) for r in res.stdout.split("\n") if r]
|
||||
assert len(results) == 2
|
||||
child = results[0]
|
||||
assert child["attr"] == "anotherone"
|
||||
aggregate = results[1]
|
||||
assert aggregate["attr"] == "aggregate"
|
||||
assert "namedConstituents" not in aggregate
|
||||
assert aggregate["constituents"][0].endswith("-job1.drv")
|
||||
assert aggregate["constituents"][1] == child["drvPath"]
|
||||
assert "error" not in aggregate
|
||||
|
||||
|
||||
def test_constituents_error() -> None:
|
||||
with TemporaryDirectory() as tempdir:
|
||||
cmd = [
|
||||
str(BIN),
|
||||
"--gc-roots-dir",
|
||||
tempdir,
|
||||
"--meta",
|
||||
"--workers",
|
||||
"1",
|
||||
"--flake",
|
||||
".#legacyPackages.x86_64-linux.failures",
|
||||
"--constituents",
|
||||
]
|
||||
res = subprocess.run(
|
||||
cmd,
|
||||
cwd=TEST_ROOT.joinpath("assets"),
|
||||
text=True,
|
||||
stdout=subprocess.PIPE,
|
||||
)
|
||||
print(res.stdout)
|
||||
results = [json.loads(r) for r in res.stdout.split("\n") if r]
|
||||
assert len(results) == 2
|
||||
child = results[0]
|
||||
assert child["attr"] == "doesnteval"
|
||||
assert "error" in child
|
||||
aggregate = results[1]
|
||||
assert aggregate["attr"] == "aggregate"
|
||||
assert "namedConstituents" not in aggregate
|
||||
assert aggregate["error"].startswith('"doesntexist": does not exist\n"doesnteval": "error: derivation ')
|
||||
assert aggregate["constituents"] == []
|
||||
|
||||
|
||||
@pytest.mark.infiniterecursion
|
||||
def test_recursion_error() -> None:
|
||||
with TemporaryDirectory() as tempdir:
|
||||
|
|
Loading…
Reference in a new issue