nix-eval-jobs/src/nix-eval-jobs.cc

592 lines
20 KiB
C++
Raw Normal View History

2020-11-29 14:33:55 +00:00
#include <map>
#include <iostream>
#include <thread>
#include <filesystem>
2020-11-29 14:33:55 +00:00
#include <nix/config.h>
2020-11-29 20:32:04 +00:00
#include <nix/shared.hh>
#include <nix/store-api.hh>
#include <nix/eval.hh>
#include <nix/eval-inline.hh>
#include <nix/util.hh>
#include <nix/get-drvs.hh>
#include <nix/globals.hh>
#include <nix/common-eval-args.hh>
#include <nix/flake/flakeref.hh>
#include <nix/flake/flake.hh>
#include <nix/attr-path.hh>
#include <nix/derivations.hh>
#include <nix/local-fs-store.hh>
2022-01-07 07:20:41 +00:00
#include <nix/logging.hh>
#include <nix/error.hh>
#include <nix/installables.hh>
2023-01-18 15:38:38 +00:00
#include <nix/path-with-outputs.hh>
2023-03-09 21:18:00 +00:00
#include <nix/installable-flake.hh>
2020-11-29 14:33:55 +00:00
#include <nix/value-to-json.hh>
2020-11-29 14:33:55 +00:00
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/resource.h>
#include <nlohmann/json.hpp>
using namespace nix;
using namespace nlohmann;
2020-11-29 14:33:55 +00:00
// Safe to ignore - the args will be static.
2022-04-25 15:53:17 +00:00
#ifdef __GNUC__
#pragma GCC diagnostic ignored "-Wnon-virtual-dtor"
2022-04-25 15:53:17 +00:00
#elif __clang__
#pragma clang diagnostic ignored "-Wnon-virtual-dtor"
2022-04-25 15:53:17 +00:00
#endif
struct MyArgs : MixEvalArgs, MixCommonArgs {
std::string releaseExpr;
Path gcRootsDir;
2020-11-29 14:33:55 +00:00
bool flake = false;
bool fromArgs = false;
bool meta = false;
2022-01-07 07:20:41 +00:00
bool showTrace = false;
bool impure = false;
bool forceRecurse = false;
bool checkCacheStatus = false;
2020-11-29 14:33:55 +00:00
size_t nrWorkers = 1;
size_t maxMemorySize = 4096;
2023-07-14 05:03:41 +00:00
// usually in MixFlakeOptions
flake::LockFlags lockFlags = {
.updateLockFile = false,
.writeLockFile = false,
.useRegistries = false,
.allowUnlocked = false
};
MyArgs() : MixCommonArgs("nix-eval-jobs") {
2020-11-29 14:33:55 +00:00
addFlag({
.longName = "help",
.description = "show usage information",
.handler = {[&]() {
printf("USAGE: nix-eval-jobs [options] expr\n\n");
for (const auto &[name, flag] : longFlags) {
2021-03-14 22:16:57 +00:00
if (hiddenCategories.count(flag->category)) {
continue;
}
printf(" --%-20s %s\n", name.c_str(),
flag->description.c_str());
2021-03-14 22:16:57 +00:00
}
::exit(0);
2021-03-14 22:16:57 +00:00
}},
2020-11-29 14:33:55 +00:00
});
addFlag({.longName = "impure",
.description = "allow impure expressions",
.handler = {&impure, true}});
addFlag(
{.longName = "force-recurse",
.description = "force recursion (don't respect recurseIntoAttrs)",
.handler = {&forceRecurse, true}});
addFlag({.longName = "gc-roots-dir",
.description = "garbage collector roots directory",
.labels = {"path"},
.handler = {&gcRootsDir}});
2020-11-29 14:33:55 +00:00
2023-06-10 11:51:59 +00:00
addFlag({.longName = "workers",
.description = "number of evaluate workers",
.labels = {"workers"},
.handler = {
[=, this](std::string s) { nrWorkers = std::stoi(s); }}});
2020-11-29 14:33:55 +00:00
2023-04-13 17:29:18 +00:00
addFlag(
{.longName = "max-memory-size",
.description =
"maximum evaluation memory size (4GiB per worker by default)",
.labels = {"size"},
.handler = {
2023-06-10 11:51:59 +00:00
[=, this](std::string s) { maxMemorySize = std::stoi(s); }}});
2020-11-29 14:33:55 +00:00
addFlag({.longName = "flake",
.description = "build a flake",
.handler = {&flake, true}});
2020-11-29 14:33:55 +00:00
addFlag({.longName = "meta",
.description = "include derivation meta field in output",
.handler = {&meta, true}});
addFlag(
{.longName = "check-cache-status",
.description =
"Check if the derivations are present locally or in "
"any configured substituters (i.e. binary cache). The "
"information "
"will be exposed in the `isCached` field of the JSON output.",
.handler = {&checkCacheStatus, true}});
addFlag({.longName = "show-trace",
.description =
"print out a stack trace in case of evaluation errors",
.handler = {&showTrace, true}});
2022-01-07 07:20:41 +00:00
addFlag({.longName = "expr",
.shortName = 'E',
.description = "treat the argument as a Nix expression",
.handler = {&fromArgs, true}});
2023-07-14 05:03:41 +00:00
// usually in MixFlakeOptions
addFlag({
.longName = "override-input",
.description = "Override a specific flake input (e.g. `dwarffs/nixpkgs`).",
.category = category,
.labels = {"input-path", "flake-url"},
.handler = {[&](std::string inputPath, std::string flakeRef) {
// overriden inputs are unlocked
lockFlags.allowUnlocked = true;
lockFlags.inputOverrides.insert_or_assign(
flake::parseInputPath(inputPath),
parseFlakeRef(flakeRef, absPath("."), true));
}},
});
2020-11-29 14:33:55 +00:00
expectArg("expr", &releaseExpr);
}
};
2022-04-25 15:53:17 +00:00
#ifdef __GNUC__
#pragma GCC diagnostic ignored "-Wnon-virtual-dtor"
#elif __clang__
#pragma clang diagnostic ignored "-Wnon-virtual-dtor"
#endif
2020-11-29 14:33:55 +00:00
static MyArgs myArgs;
static Value *releaseExprTopLevelValue(EvalState &state, Bindings &autoArgs) {
2020-11-29 14:33:55 +00:00
Value vTop;
if (myArgs.fromArgs) {
2023-06-10 11:28:57 +00:00
Expr *e = state.parseExprFromString(
myArgs.releaseExpr, state.rootPath(CanonPath::fromCwd()));
state.eval(e, vTop);
} else {
state.evalFile(lookupFileArg(state, myArgs.releaseExpr), vTop);
}
auto vRoot = state.allocValue();
state.autoCallFunction(autoArgs, vTop, *vRoot);
2020-11-29 14:33:55 +00:00
return vRoot;
}
2020-11-29 14:33:55 +00:00
bool queryIsCached(Store &store, std::map<std::string, std::string> &outputs) {
uint64_t downloadSize, narSize;
StorePathSet willBuild, willSubstitute, unknown;
std::vector<StorePathWithOutputs> paths;
for (auto const &[key, val] : outputs) {
paths.push_back(followLinksToStorePathWithOutputs(store, val));
}
store.queryMissing(toDerivedPaths(paths), willBuild, willSubstitute,
unknown, downloadSize, narSize);
return willBuild.empty() && unknown.empty();
}
/* The fields of a derivation that are printed in json form */
struct Drv {
std::string name;
std::string system;
std::string drvPath;
bool isCached;
std::map<std::string, std::string> outputs;
std::map<std::string, std::set<std::string>> inputDrvs;
std::optional<nlohmann::json> meta;
Drv(EvalState &state, DrvInfo &drvInfo) {
if (drvInfo.querySystem() == "unknown")
throw EvalError("derivation must have a 'system' attribute");
auto localStore = state.store.dynamic_pointer_cast<LocalFSStore>();
try {
for (auto out : drvInfo.queryOutputs(true)) {
if (out.second)
outputs[out.first] =
localStore->printStorePath(*out.second);
}
} catch (const std::exception &e) {
throw EvalError("derivation must have valid outputs: %s", e.what());
}
if (myArgs.meta) {
nlohmann::json meta_;
2022-09-17 08:48:25 +00:00
for (auto &metaName : drvInfo.queryMetaNames()) {
2023-06-10 11:28:57 +00:00
NixStringContext context;
std::stringstream ss;
2022-09-17 08:48:25 +00:00
auto metaValue = drvInfo.queryMeta(metaName);
// Skip non-serialisable types
// TODO: Fix serialisation of derivations to store paths
if (metaValue == 0) {
continue;
}
printValueAsJSON(state, true, *metaValue, noPos, ss, context);
2022-09-17 08:48:25 +00:00
meta_[metaName] = nlohmann::json::parse(ss.str());
}
meta = meta_;
}
if (myArgs.checkCacheStatus) {
isCached = queryIsCached(*localStore, outputs);
}
name = drvInfo.queryName();
system = drvInfo.querySystem();
drvPath = localStore->printStorePath(drvInfo.requireDrvPath());
auto drv = localStore->readDerivation(drvInfo.requireDrvPath());
for (auto &input : drv.inputDrvs) {
inputDrvs[localStore->printStorePath(input.first)] = input.second;
}
}
};
static void to_json(nlohmann::json &json, const Drv &drv) {
json = nlohmann::json{{"name", drv.name},
{"system", drv.system},
{"drvPath", drv.drvPath},
{"outputs", drv.outputs},
{"inputDrvs", drv.inputDrvs}};
if (drv.meta.has_value()) {
json["meta"] = drv.meta.value();
}
if (myArgs.checkCacheStatus) {
json["isCached"] = drv.isCached;
}
}
std::string attrPathJoin(json input) {
return std::accumulate(input.begin(), input.end(), std::string(),
[](std::string ss, std::string s) {
// Escape token if containing dots
if (s.find(".") != std::string::npos) {
s = "\"" + s + "\"";
}
return ss.empty() ? s : ss + "." + s;
});
}
static void worker(ref<EvalState> state, Bindings &autoArgs, AutoCloseFD &to,
AutoCloseFD &from) {
nix::Value *vRoot = [&]() {
if (myArgs.flake) {
auto [flakeRef, fragment, outputSpec] =
parseFlakeRefWithFragmentAndExtendedOutputsSpec(myArgs.releaseExpr,
absPath("."));
InstallableFlake flake {
{}, state, std::move(flakeRef), fragment,
2023-07-14 05:03:41 +00:00
outputSpec, {}, {}, myArgs.lockFlags
};
return flake.toValue(*state).first;
} else {
return releaseExprTopLevelValue(*state, autoArgs);
}
}();
2020-11-29 14:33:55 +00:00
while (true) {
/* Wait for the collector to send us a job name. */
2020-11-29 14:33:55 +00:00
writeLine(to.get(), "next");
auto s = readLine(from.get());
if (s == "exit")
break;
if (!hasPrefix(s, "do "))
abort();
auto path = json::parse(s.substr(3));
auto attrPathS = attrPathJoin(path);
2020-11-29 14:33:55 +00:00
debug("worker process %d at '%s'", getpid(), path);
2020-11-29 14:33:55 +00:00
/* Evaluate it and send info back to the collector. */
json reply = json{{"attr", attrPathS}, {"attrPath", path}};
2020-11-29 14:33:55 +00:00
try {
auto vTmp =
findAlongAttrPath(*state, attrPathS, autoArgs, *vRoot).first;
2020-11-29 14:33:55 +00:00
auto v = state->allocValue();
state->autoCallFunction(autoArgs, *vTmp, *v);
2020-11-29 14:33:55 +00:00
2022-09-09 11:26:15 +00:00
if (v->type() == nAttrs) {
if (auto drvInfo = getDerivation(*state, *v, false)) {
auto drv = Drv(*state, *drvInfo);
2022-09-09 11:26:15 +00:00
reply.update(drv);
/* Register the derivation as a GC root. !!! This
registers roots for jobs that we may have already
done. */
if (myArgs.gcRootsDir != "") {
Path root = myArgs.gcRootsDir + "/" +
std::string(baseNameOf(drv.drvPath));
if (!pathExists(root)) {
auto localStore =
state->store
2022-09-09 11:26:15 +00:00
.dynamic_pointer_cast<LocalFSStore>();
auto storePath =
localStore->parseStorePath(drv.drvPath);
localStore->addPermRoot(storePath, root);
}
2022-04-21 18:22:29 +00:00
}
2022-09-09 11:26:15 +00:00
} else {
auto attrs = nlohmann::json::array();
bool recurse =
myArgs.forceRecurse ||
2022-09-09 11:26:15 +00:00
path.size() == 0; // Dont require `recurseForDerivations
// = true;` for top-level attrset
for (auto &i :
v->attrs->lexicographicOrder(state->symbols)) {
const std::string &name = state->symbols[i->name];
2022-09-09 11:26:15 +00:00
attrs.push_back(name);
if (name == "recurseForDerivations" &&
!myArgs.forceRecurse) {
2022-09-09 11:26:15 +00:00
auto attrv =
v->attrs->get(state->sRecurseForDerivations);
2023-03-09 21:18:00 +00:00
recurse = state->forceBool(
*attrv->value, attrv->pos,
"while evaluating recurseForDerivations");
2022-09-09 11:26:15 +00:00
}
}
2022-09-09 11:26:15 +00:00
if (recurse)
reply["attrs"] = std::move(attrs);
else
reply["attrs"] = nlohmann::json::array();
2020-11-29 14:33:55 +00:00
}
2022-09-09 11:26:15 +00:00
} else {
// We ignore everything that cannot be build
reply["attrs"] = nlohmann::json::array();
2020-11-29 14:33:55 +00:00
}
} catch (EvalError &e) {
2022-01-07 07:20:41 +00:00
auto err = e.info();
std::ostringstream oss;
showErrorInfo(oss, err, loggerSettings.showTrace.get());
auto msg = oss.str();
2020-11-29 14:33:55 +00:00
// Transmits the error we got from the previous evaluation
// in the JSON output.
2021-03-14 22:16:57 +00:00
reply["error"] = filterANSIEscapes(msg, true);
2020-11-29 14:33:55 +00:00
// Don't forget to print it into the STDERR log, this is
// what's shown in the Hydra UI.
2022-01-07 07:20:41 +00:00
printError(e.msg());
2020-11-29 14:33:55 +00:00
}
writeLine(to.get(), reply.dump());
/* If our RSS exceeds the maximum, exit. The collector will
2020-11-29 14:33:55 +00:00
start a new process. */
struct rusage r;
getrusage(RUSAGE_SELF, &r);
if ((size_t)r.ru_maxrss > myArgs.maxMemorySize * 1024)
break;
2020-11-29 14:33:55 +00:00
}
writeLine(to.get(), "restart");
}
typedef std::function<void(ref<EvalState> state, Bindings &autoArgs,
AutoCloseFD &to, AutoCloseFD &from)>
Processor;
/* Auto-cleanup of fork's process and fds. */
struct Proc {
AutoCloseFD to, from;
Pid pid;
Proc(const Processor &proc) {
Pipe toPipe, fromPipe;
toPipe.create();
fromPipe.create();
auto p = startProcess(
[&,
to{std::make_shared<AutoCloseFD>(std::move(fromPipe.writeSide))},
from{
std::make_shared<AutoCloseFD>(std::move(toPipe.readSide))}]() {
debug("created worker process %d", getpid());
try {
auto state = std::make_shared<EvalState>(
myArgs.searchPath, openStore(*myArgs.evalStoreUrl));
Bindings &autoArgs = *myArgs.getAutoArgs(*state);
proc(ref<EvalState>(state), autoArgs, *to, *from);
} catch (Error &e) {
nlohmann::json err;
auto msg = e.msg();
err["error"] = filterANSIEscapes(msg, true);
printError(msg);
writeLine(to->get(), err.dump());
// Don't forget to print it into the STDERR log, this is
// what's shown in the Hydra UI.
writeLine(to->get(), "restart");
}
},
ProcessOptions{.allowVfork = false});
to = std::move(toPipe.writeSide);
from = std::move(fromPipe.readSide);
pid = p;
}
~Proc() {}
};
struct State {
std::set<json> todo = json::array({json::array()});
std::set<json> active;
std::exception_ptr exc;
};
std::function<void()> collector(Sync<State> &state_,
std::condition_variable &wakeup) {
return [&]() {
try {
std::optional<std::unique_ptr<Proc>> proc_;
while (true) {
auto proc = proc_.has_value() ? std::move(proc_.value())
: std::make_unique<Proc>(worker);
/* Check whether the existing worker process is still there. */
auto s = readLine(proc->from.get());
if (s == "restart") {
proc_ = std::nullopt;
continue;
} else if (s != "next") {
auto json = json::parse(s);
throw Error("worker error: %s", (std::string)json["error"]);
}
/* Wait for a job name to become available. */
json attrPath;
while (true) {
checkInterrupt();
auto state(state_.lock());
if ((state->todo.empty() && state->active.empty()) ||
state->exc) {
writeLine(proc->to.get(), "exit");
return;
}
if (!state->todo.empty()) {
attrPath = *state->todo.begin();
state->todo.erase(state->todo.begin());
state->active.insert(attrPath);
break;
} else
state.wait(wakeup);
}
/* Tell the worker to evaluate it. */
writeLine(proc->to.get(), "do " + attrPath.dump());
/* Wait for the response. */
auto respString = readLine(proc->from.get());
auto response = json::parse(respString);
/* Handle the response. */
std::vector<json> newAttrs;
if (response.find("attrs") != response.end()) {
for (auto &i : response["attrs"]) {
json newAttr = json(response["attrPath"]);
newAttr.emplace_back(i);
newAttrs.push_back(newAttr);
}
} else {
auto state(state_.lock());
std::cout << respString << "\n" << std::flush;
}
proc_ = std::move(proc);
/* Add newly discovered job names to the queue. */
{
auto state(state_.lock());
state->active.erase(attrPath);
for (auto p : newAttrs) {
state->todo.insert(p);
}
wakeup.notify_all();
}
}
} catch (...) {
auto state(state_.lock());
state->exc = std::current_exception();
wakeup.notify_all();
}
};
}
int main(int argc, char **argv) {
2020-11-29 14:33:55 +00:00
/* Prevent undeclared dependencies in the evaluation via
$NIX_PATH. */
unsetenv("NIX_PATH");
/* We are doing the garbage collection by killing forks */
setenv("GC_DONT_GC", "1", 1);
2020-11-29 14:33:55 +00:00
return handleExceptions(argv[0], [&]() {
initNix();
initGC();
myArgs.parseCmdline(argvToStrings(argc, argv));
/* FIXME: The build hook in conjunction with import-from-derivation is
* causing "unexpected EOF" during eval */
2020-11-29 14:33:55 +00:00
settings.builders = "";
/* Prevent access to paths outside of the Nix search path and
to the environment. */
2021-03-14 22:32:36 +00:00
evalSettings.restrictEval = false;
2020-11-29 14:33:55 +00:00
/* When building a flake, use pure evaluation (no access to
'getEnv', 'currentSystem' etc. */
if (myArgs.impure) {
evalSettings.pureEval = false;
} else if (myArgs.flake) {
evalSettings.pureEval = true;
}
2020-11-29 14:33:55 +00:00
if (myArgs.releaseExpr == "")
throw UsageError("no expression specified");
2020-11-29 14:33:55 +00:00
if (myArgs.gcRootsDir == "") {
printMsg(lvlError, "warning: `--gc-roots-dir' not specified");
} else {
myArgs.gcRootsDir = std::filesystem::absolute(myArgs.gcRootsDir);
}
2020-11-29 14:33:55 +00:00
2022-01-07 07:20:41 +00:00
if (myArgs.showTrace) {
loggerSettings.showTrace.assign(true);
}
2020-11-29 14:33:55 +00:00
Sync<State> state_;
/* Start a collector thread per worker process. */
2020-11-29 14:33:55 +00:00
std::vector<std::thread> threads;
std::condition_variable wakeup;
2020-11-29 14:33:55 +00:00
for (size_t i = 0; i < myArgs.nrWorkers; i++)
threads.emplace_back(std::thread(collector(state_, wakeup)));
2020-11-29 14:33:55 +00:00
for (auto &thread : threads)
2020-11-29 14:33:55 +00:00
thread.join();
auto state(state_.lock());
if (state->exc)
std::rethrow_exception(state->exc);
});
}