From a03f039a562f0b51bbbd9e5032a5b1042768d7cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Thalheim?= Date: Sun, 10 Dec 2023 15:41:51 +0100 Subject: [PATCH] split nix-eval-jobs further into smaller files --- src/buffered-io.cc | 52 ++++++++++ src/buffered-io.hh | 20 ++++ src/drv.cc | 2 +- src/meson.build | 4 +- src/nix-eval-jobs.cc | 231 ++----------------------------------------- src/worker.cc | 173 ++++++++++++++++++++++++++++++++ src/worker.hh | 9 ++ 7 files changed, 265 insertions(+), 226 deletions(-) create mode 100644 src/buffered-io.cc create mode 100644 src/buffered-io.hh create mode 100644 src/worker.cc create mode 100644 src/worker.hh diff --git a/src/buffered-io.cc b/src/buffered-io.cc new file mode 100644 index 0000000..6d3ab6d --- /dev/null +++ b/src/buffered-io.cc @@ -0,0 +1,52 @@ +#include "buffered-io.hh" +#include +#include +#include + +[[nodiscard]] int tryWriteLine(int fd, std::string s) { + s += "\n"; + std::string_view sv{s}; + while (!sv.empty()) { + nix::checkInterrupt(); + ssize_t res = write(fd, sv.data(), sv.size()); + if (res == -1 && errno != EINTR) { + return -errno; + } + if (res > 0) { + sv.remove_prefix(res); + } + } + return 0; +} + +LineReader::LineReader(int fd) { + stream = fdopen(fd, "r"); + if (!stream) { + throw nix::Error("fdopen failed: %s", strerror(errno)); + } +} + +LineReader::~LineReader() { + fclose(stream); + free(buffer); +} + +LineReader::LineReader(LineReader &&other) { + stream = other.stream; + other.stream = nullptr; + buffer = other.buffer; + other.buffer = nullptr; + len = other.len; + other.len = 0; +} + +[[nodiscard]] std::string_view LineReader::readLine() { + ssize_t read = getline(&buffer, &len, stream); + + if (read == -1) { + return {}; // Return an empty string_view in case of error + } + + // Remove trailing newline + return std::string_view(buffer, read - 1); +} diff --git a/src/buffered-io.hh b/src/buffered-io.hh new file mode 100644 index 0000000..e1f0685 --- /dev/null +++ b/src/buffered-io.hh @@ -0,0 +1,20 @@ +#pragma once +#include +#include +#include + +[[nodiscard]] int tryWriteLine(int fd, std::string s); + +class LineReader { + public: + LineReader(int fd); + ~LineReader(); + + LineReader(LineReader &&other); + [[nodiscard]] std::string_view readLine(); + + private: + FILE *stream = nullptr; + char *buffer = nullptr; + size_t len = 0; +}; diff --git a/src/drv.cc b/src/drv.cc index dc91ddf..43578f8 100644 --- a/src/drv.cc +++ b/src/drv.cc @@ -1,4 +1,4 @@ -#include "drvs.hh" +#include "drv.hh" #include #include #include diff --git a/src/meson.build b/src/meson.build index 6b8d6dc..cf47097 100644 --- a/src/meson.build +++ b/src/meson.build @@ -1,7 +1,9 @@ src = [ 'nix-eval-jobs.cc', 'eval-args.cc', - 'drv.cc' + 'drv.cc', + 'buffered-io.cc', + 'worker.cc' ] executable('nix-eval-jobs', src, diff --git a/src/nix-eval-jobs.cc b/src/nix-eval-jobs.cc index 670bd48..5a25f36 100644 --- a/src/nix-eval-jobs.cc +++ b/src/nix-eval-jobs.cc @@ -9,19 +9,18 @@ #include #include -#include #include -#include -#include #include -#include #include +#include #include +#include #include -#include #include "eval-args.hh" #include "drv.hh" +#include "buffered-io.hh" +#include "worker.hh" #include @@ -36,224 +35,8 @@ using namespace nlohmann; #endif static MyArgs myArgs; -static Value *releaseExprTopLevelValue(EvalState &state, Bindings &autoArgs) { - Value vTop; - - if (myArgs.fromArgs) { - Expr *e = state.parseExprFromString( - myArgs.releaseExpr, state.rootPath(CanonPath::fromCwd())); - state.eval(e, vTop); - } else { - state.evalFile(lookupFileArg(state, myArgs.releaseExpr), vTop); - } - - auto vRoot = state.allocValue(); - - state.autoCallFunction(autoArgs, vTop, *vRoot); - - return vRoot; -} - - -std::string attrPathJoin(json input) { - return std::accumulate(input.begin(), input.end(), std::string(), - [](std::string ss, std::string s) { - // Escape token if containing dots - if (s.find(".") != std::string::npos) { - s = "\"" + s + "\""; - } - return ss.empty() ? s : ss + "." + s; - }); -} - -[[nodiscard]] static int tryWriteLine(int fd, std::string s) { - s += "\n"; - std::string_view sv{s}; - while (!sv.empty()) { - checkInterrupt(); - ssize_t res = write(fd, sv.data(), sv.size()); - if (res == -1 && errno != EINTR) { - return -errno; - } - if (res > 0) { - sv.remove_prefix(res); - } - } - return 0; -} - -class LineReader { - public: - LineReader(int fd) { - stream = fdopen(fd, "r"); - if (!stream) { - throw Error("fdopen failed: %s", strerror(errno)); - } - } - - ~LineReader() { - fclose(stream); - free(buffer); - } - - LineReader(LineReader &&other) { - stream = other.stream; - other.stream = nullptr; - buffer = other.buffer; - other.buffer = nullptr; - len = other.len; - other.len = 0; - } - - [[nodiscard]] std::string_view readLine() { - ssize_t read = getline(&buffer, &len, stream); - - if (read == -1) { - return {}; // Return an empty string_view in case of error - } - - // Remove trailing newline - return std::string_view(buffer, read - 1); - } - - private: - FILE *stream = nullptr; - char *buffer = nullptr; - size_t len = 0; -}; - -static void worker(ref state, Bindings &autoArgs, AutoCloseFD &to, - AutoCloseFD &from) { - - nix::Value *vRoot = [&]() { - if (myArgs.flake) { - auto [flakeRef, fragment, outputSpec] = - parseFlakeRefWithFragmentAndExtendedOutputsSpec( - myArgs.releaseExpr, absPath(".")); - InstallableFlake flake{ - {}, state, std::move(flakeRef), fragment, outputSpec, - {}, {}, myArgs.lockFlags}; - - return flake.toValue(*state).first; - } else { - return releaseExprTopLevelValue(*state, autoArgs); - } - }(); - - LineReader fromReader(from.release()); - - while (true) { - /* Wait for the collector to send us a job name. */ - if (tryWriteLine(to.get(), "next") < 0) { - return; // main process died - } - - auto s = fromReader.readLine(); - if (s == "exit") { - break; - } - if (!hasPrefix(s, "do ")) { - fprintf(stderr, "worker error: received invalid command '%s'\n", - s.data()); - abort(); - } - auto path = json::parse(s.substr(3)); - auto attrPathS = attrPathJoin(path); - - debug("worker process %d at '%s'", getpid(), path); - - /* Evaluate it and send info back to the collector. */ - json reply = json{{"attr", attrPathS}, {"attrPath", path}}; - try { - auto vTmp = - findAlongAttrPath(*state, attrPathS, autoArgs, *vRoot).first; - - auto v = state->allocValue(); - state->autoCallFunction(autoArgs, *vTmp, *v); - - if (v->type() == nAttrs) { - if (auto drvInfo = getDerivation(*state, *v, false)) { - auto drv = Drv(attrPathS, *state, *drvInfo, myArgs); - reply.update(drv); - - /* Register the derivation as a GC root. !!! This - registers roots for jobs that we may have already - done. */ - if (myArgs.gcRootsDir != "") { - Path root = myArgs.gcRootsDir + "/" + - std::string(baseNameOf(drv.drvPath)); - if (!pathExists(root)) { - auto localStore = - state->store - .dynamic_pointer_cast(); - auto storePath = - localStore->parseStorePath(drv.drvPath); - localStore->addPermRoot(storePath, root); - } - } - } else { - auto attrs = nlohmann::json::array(); - bool recurse = - myArgs.forceRecurse || - path.size() == 0; // Dont require `recurseForDerivations - // = true;` for top-level attrset - - for (auto &i : - v->attrs->lexicographicOrder(state->symbols)) { - const std::string &name = state->symbols[i->name]; - attrs.push_back(name); - - if (name == "recurseForDerivations" && - !myArgs.forceRecurse) { - auto attrv = - v->attrs->get(state->sRecurseForDerivations); - recurse = state->forceBool( - *attrv->value, attrv->pos, - "while evaluating recurseForDerivations"); - } - } - if (recurse) - reply["attrs"] = std::move(attrs); - else - reply["attrs"] = nlohmann::json::array(); - } - } else { - // We ignore everything that cannot be build - reply["attrs"] = nlohmann::json::array(); - } - } catch (EvalError &e) { - auto err = e.info(); - std::ostringstream oss; - showErrorInfo(oss, err, loggerSettings.showTrace.get()); - auto msg = oss.str(); - - // Transmits the error we got from the previous evaluation - // in the JSON output. - reply["error"] = filterANSIEscapes(msg, true); - // Don't forget to print it into the STDERR log, this is - // what's shown in the Hydra UI. - printError(e.msg()); - } - - if (tryWriteLine(to.get(), reply.dump()) < 0) { - return; // main process died - } - - /* If our RSS exceeds the maximum, exit. The collector will - start a new process. */ - struct rusage r; - getrusage(RUSAGE_SELF, &r); - if ((size_t)r.ru_maxrss > myArgs.maxMemorySize * 1024) - break; - } - - if (tryWriteLine(to.get(), "restart") < 0) { - return; // main process died - }; -} - typedef std::function state, Bindings &autoArgs, - AutoCloseFD &to, AutoCloseFD &from)> + AutoCloseFD &to, AutoCloseFD &from, MyArgs &args)> Processor; /* Auto-cleanup of fork's process and fds. */ @@ -275,11 +58,11 @@ struct Proc { auto state = std::make_shared( myArgs.searchPath, openStore(*myArgs.evalStoreUrl)); Bindings &autoArgs = *myArgs.getAutoArgs(*state); - proc(ref(state), autoArgs, *to, *from); + proc(ref(state), autoArgs, *to, *from, myArgs); } catch (Error &e) { nlohmann::json err; auto msg = e.msg(); - err["error"] = filterANSIEscapes(msg, true); + err["error"] = nix::filterANSIEscapes(msg, true); printError(msg); if (tryWriteLine(to->get(), err.dump()) < 0) { return; // main process died diff --git a/src/worker.cc b/src/worker.cc new file mode 100644 index 0000000..a1aa293 --- /dev/null +++ b/src/worker.cc @@ -0,0 +1,173 @@ +#include "worker.hh" +#include "drv.hh" +#include "buffered-io.hh" + +#include +#include +#include +#include + +#include +#include + +static nix::Value *releaseExprTopLevelValue(nix::EvalState &state, + nix::Bindings &autoArgs, + MyArgs &args) { + nix::Value vTop; + + if (args.fromArgs) { + nix::Expr *e = state.parseExprFromString( + args.releaseExpr, state.rootPath(nix::CanonPath::fromCwd())); + state.eval(e, vTop); + } else { + state.evalFile(lookupFileArg(state, args.releaseExpr), vTop); + } + + auto vRoot = state.allocValue(); + + state.autoCallFunction(autoArgs, vTop, *vRoot); + + return vRoot; +} + +static std::string attrPathJoin(nlohmann::json input) { + return std::accumulate(input.begin(), input.end(), std::string(), + [](std::string ss, std::string s) { + // Escape token if containing dots + if (s.find(".") != std::string::npos) { + s = "\"" + s + "\""; + } + return ss.empty() ? s : ss + "." + s; + }); +} + +void worker(nix::ref state, nix::Bindings &autoArgs, + nix::AutoCloseFD &to, nix::AutoCloseFD &from, MyArgs &args) { + + nix::Value *vRoot = [&]() { + if (args.flake) { + auto [flakeRef, fragment, outputSpec] = + nix::parseFlakeRefWithFragmentAndExtendedOutputsSpec( + args.releaseExpr, nix::absPath(".")); + nix::InstallableFlake flake{ + {}, state, std::move(flakeRef), fragment, outputSpec, + {}, {}, args.lockFlags}; + + return flake.toValue(*state).first; + } else { + return releaseExprTopLevelValue(*state, autoArgs, args); + } + }(); + + LineReader fromReader(from.release()); + + while (true) { + /* Wait for the collector to send us a job name. */ + if (tryWriteLine(to.get(), "next") < 0) { + return; // main process died + } + + auto s = fromReader.readLine(); + if (s == "exit") { + break; + } + if (!nix::hasPrefix(s, "do ")) { + fprintf(stderr, "worker error: received invalid command '%s'\n", + s.data()); + abort(); + } + auto path = nlohmann::json::parse(s.substr(3)); + auto attrPathS = attrPathJoin(path); + + /* Evaluate it and send info back to the collector. */ + nlohmann::json reply = + nlohmann::json{{"attr", attrPathS}, {"attrPath", path}}; + try { + auto vTmp = + nix::findAlongAttrPath(*state, attrPathS, autoArgs, *vRoot) + .first; + + auto v = state->allocValue(); + state->autoCallFunction(autoArgs, *vTmp, *v); + + if (v->type() == nix::nAttrs) { + if (auto drvInfo = nix::getDerivation(*state, *v, false)) { + auto drv = Drv(attrPathS, *state, *drvInfo, args); + reply.update(drv); + + /* Register the derivation as a GC root. !!! This + registers roots for jobs that we may have already + done. */ + if (args.gcRootsDir != "") { + nix::Path root = + args.gcRootsDir + "/" + + std::string(nix::baseNameOf(drv.drvPath)); + if (!nix::pathExists(root)) { + auto localStore = + state->store + .dynamic_pointer_cast(); + auto storePath = + localStore->parseStorePath(drv.drvPath); + localStore->addPermRoot(storePath, root); + } + } + } else { + auto attrs = nlohmann::json::array(); + bool recurse = + args.forceRecurse || + path.size() == 0; // Dont require `recurseForDerivations + // = true;` for top-level attrset + + for (auto &i : + v->attrs->lexicographicOrder(state->symbols)) { + const std::string &name = state->symbols[i->name]; + attrs.push_back(name); + + if (name == "recurseForDerivations" && + !args.forceRecurse) { + auto attrv = + v->attrs->get(state->sRecurseForDerivations); + recurse = state->forceBool( + *attrv->value, attrv->pos, + "while evaluating recurseForDerivations"); + } + } + if (recurse) + reply["attrs"] = std::move(attrs); + else + reply["attrs"] = nlohmann::json::array(); + } + } else { + // We ignore everything that cannot be build + reply["attrs"] = nlohmann::json::array(); + } + } catch (nix::EvalError &e) { + auto err = e.info(); + std::ostringstream oss; + nix::showErrorInfo(oss, err, nix::loggerSettings.showTrace.get()); + auto msg = oss.str(); + + // Transmits the error we got from the previous evaluation + // in the JSON output. + reply["error"] = nix::filterANSIEscapes(msg, true); + // Don't forget to print it into the STDERR log, this is + // what's shown in the Hydra UI. + fprintf(stderr, "%s\n", msg.c_str()); + } + + if (tryWriteLine(to.get(), reply.dump()) < 0) { + return; // main process died + } + + /* If our RSS exceeds the maximum, exit. The collector will + start a new process. */ + struct rusage r; + getrusage(RUSAGE_SELF, &r); + if ((size_t)r.ru_maxrss > args.maxMemorySize * 1024) + break; + } + + if (tryWriteLine(to.get(), "restart") < 0) { + return; // main process died + }; +} diff --git a/src/worker.hh b/src/worker.hh new file mode 100644 index 0000000..0f7d735 --- /dev/null +++ b/src/worker.hh @@ -0,0 +1,9 @@ +#pragma once +#include +#include +#include + +#include "eval-args.hh" + +void worker(nix::ref state, nix::Bindings &autoArgs, + nix::AutoCloseFD &to, nix::AutoCloseFD &from, MyArgs &args);