split nix-eval-jobs further into smaller files
This commit is contained in:
parent
880c66a7d1
commit
a03f039a56
52
src/buffered-io.cc
Normal file
52
src/buffered-io.cc
Normal file
|
@ -0,0 +1,52 @@
|
|||
#include "buffered-io.hh"
|
||||
#include <string.h>
|
||||
#include <unistd.h>
|
||||
#include <nix/signals.hh>
|
||||
|
||||
[[nodiscard]] int tryWriteLine(int fd, std::string s) {
|
||||
s += "\n";
|
||||
std::string_view sv{s};
|
||||
while (!sv.empty()) {
|
||||
nix::checkInterrupt();
|
||||
ssize_t res = write(fd, sv.data(), sv.size());
|
||||
if (res == -1 && errno != EINTR) {
|
||||
return -errno;
|
||||
}
|
||||
if (res > 0) {
|
||||
sv.remove_prefix(res);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
LineReader::LineReader(int fd) {
|
||||
stream = fdopen(fd, "r");
|
||||
if (!stream) {
|
||||
throw nix::Error("fdopen failed: %s", strerror(errno));
|
||||
}
|
||||
}
|
||||
|
||||
LineReader::~LineReader() {
|
||||
fclose(stream);
|
||||
free(buffer);
|
||||
}
|
||||
|
||||
LineReader::LineReader(LineReader &&other) {
|
||||
stream = other.stream;
|
||||
other.stream = nullptr;
|
||||
buffer = other.buffer;
|
||||
other.buffer = nullptr;
|
||||
len = other.len;
|
||||
other.len = 0;
|
||||
}
|
||||
|
||||
[[nodiscard]] std::string_view LineReader::readLine() {
|
||||
ssize_t read = getline(&buffer, &len, stream);
|
||||
|
||||
if (read == -1) {
|
||||
return {}; // Return an empty string_view in case of error
|
||||
}
|
||||
|
||||
// Remove trailing newline
|
||||
return std::string_view(buffer, read - 1);
|
||||
}
|
20
src/buffered-io.hh
Normal file
20
src/buffered-io.hh
Normal file
|
@ -0,0 +1,20 @@
|
|||
#pragma once
|
||||
#include <cstdio>
|
||||
#include <string>
|
||||
#include <string_view>
|
||||
|
||||
[[nodiscard]] int tryWriteLine(int fd, std::string s);
|
||||
|
||||
class LineReader {
|
||||
public:
|
||||
LineReader(int fd);
|
||||
~LineReader();
|
||||
|
||||
LineReader(LineReader &&other);
|
||||
[[nodiscard]] std::string_view readLine();
|
||||
|
||||
private:
|
||||
FILE *stream = nullptr;
|
||||
char *buffer = nullptr;
|
||||
size_t len = 0;
|
||||
};
|
|
@ -1,4 +1,4 @@
|
|||
#include "drvs.hh"
|
||||
#include "drv.hh"
|
||||
#include <nix/config.h>
|
||||
#include <nix/path-with-outputs.hh>
|
||||
#include <nix/store-api.hh>
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
src = [
|
||||
'nix-eval-jobs.cc',
|
||||
'eval-args.cc',
|
||||
'drv.cc'
|
||||
'drv.cc',
|
||||
'buffered-io.cc',
|
||||
'worker.cc'
|
||||
]
|
||||
|
||||
executable('nix-eval-jobs', src,
|
||||
|
|
|
@ -9,19 +9,18 @@
|
|||
|
||||
#include <nix/shared.hh>
|
||||
#include <nix/sync.hh>
|
||||
#include <nix/terminal.hh>
|
||||
#include <nix/eval.hh>
|
||||
#include <nix/local-fs-store.hh>
|
||||
#include <nix/installable-flake.hh>
|
||||
#include <nix/get-drvs.hh>
|
||||
#include <nix/attr-path.hh>
|
||||
#include <nix/value-to-json.hh>
|
||||
#include <nix/local-fs-store.hh>
|
||||
#include <nix/signals.hh>
|
||||
#include <nix/terminal.hh>
|
||||
#include <sys/wait.h>
|
||||
#include <sys/resource.h>
|
||||
|
||||
#include "eval-args.hh"
|
||||
#include "drv.hh"
|
||||
#include "buffered-io.hh"
|
||||
#include "worker.hh"
|
||||
|
||||
#include <nlohmann/json.hpp>
|
||||
|
||||
|
@ -36,224 +35,8 @@ using namespace nlohmann;
|
|||
#endif
|
||||
static MyArgs myArgs;
|
||||
|
||||
static Value *releaseExprTopLevelValue(EvalState &state, Bindings &autoArgs) {
|
||||
Value vTop;
|
||||
|
||||
if (myArgs.fromArgs) {
|
||||
Expr *e = state.parseExprFromString(
|
||||
myArgs.releaseExpr, state.rootPath(CanonPath::fromCwd()));
|
||||
state.eval(e, vTop);
|
||||
} else {
|
||||
state.evalFile(lookupFileArg(state, myArgs.releaseExpr), vTop);
|
||||
}
|
||||
|
||||
auto vRoot = state.allocValue();
|
||||
|
||||
state.autoCallFunction(autoArgs, vTop, *vRoot);
|
||||
|
||||
return vRoot;
|
||||
}
|
||||
|
||||
|
||||
std::string attrPathJoin(json input) {
|
||||
return std::accumulate(input.begin(), input.end(), std::string(),
|
||||
[](std::string ss, std::string s) {
|
||||
// Escape token if containing dots
|
||||
if (s.find(".") != std::string::npos) {
|
||||
s = "\"" + s + "\"";
|
||||
}
|
||||
return ss.empty() ? s : ss + "." + s;
|
||||
});
|
||||
}
|
||||
|
||||
[[nodiscard]] static int tryWriteLine(int fd, std::string s) {
|
||||
s += "\n";
|
||||
std::string_view sv{s};
|
||||
while (!sv.empty()) {
|
||||
checkInterrupt();
|
||||
ssize_t res = write(fd, sv.data(), sv.size());
|
||||
if (res == -1 && errno != EINTR) {
|
||||
return -errno;
|
||||
}
|
||||
if (res > 0) {
|
||||
sv.remove_prefix(res);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
class LineReader {
|
||||
public:
|
||||
LineReader(int fd) {
|
||||
stream = fdopen(fd, "r");
|
||||
if (!stream) {
|
||||
throw Error("fdopen failed: %s", strerror(errno));
|
||||
}
|
||||
}
|
||||
|
||||
~LineReader() {
|
||||
fclose(stream);
|
||||
free(buffer);
|
||||
}
|
||||
|
||||
LineReader(LineReader &&other) {
|
||||
stream = other.stream;
|
||||
other.stream = nullptr;
|
||||
buffer = other.buffer;
|
||||
other.buffer = nullptr;
|
||||
len = other.len;
|
||||
other.len = 0;
|
||||
}
|
||||
|
||||
[[nodiscard]] std::string_view readLine() {
|
||||
ssize_t read = getline(&buffer, &len, stream);
|
||||
|
||||
if (read == -1) {
|
||||
return {}; // Return an empty string_view in case of error
|
||||
}
|
||||
|
||||
// Remove trailing newline
|
||||
return std::string_view(buffer, read - 1);
|
||||
}
|
||||
|
||||
private:
|
||||
FILE *stream = nullptr;
|
||||
char *buffer = nullptr;
|
||||
size_t len = 0;
|
||||
};
|
||||
|
||||
static void worker(ref<EvalState> state, Bindings &autoArgs, AutoCloseFD &to,
|
||||
AutoCloseFD &from) {
|
||||
|
||||
nix::Value *vRoot = [&]() {
|
||||
if (myArgs.flake) {
|
||||
auto [flakeRef, fragment, outputSpec] =
|
||||
parseFlakeRefWithFragmentAndExtendedOutputsSpec(
|
||||
myArgs.releaseExpr, absPath("."));
|
||||
InstallableFlake flake{
|
||||
{}, state, std::move(flakeRef), fragment, outputSpec,
|
||||
{}, {}, myArgs.lockFlags};
|
||||
|
||||
return flake.toValue(*state).first;
|
||||
} else {
|
||||
return releaseExprTopLevelValue(*state, autoArgs);
|
||||
}
|
||||
}();
|
||||
|
||||
LineReader fromReader(from.release());
|
||||
|
||||
while (true) {
|
||||
/* Wait for the collector to send us a job name. */
|
||||
if (tryWriteLine(to.get(), "next") < 0) {
|
||||
return; // main process died
|
||||
}
|
||||
|
||||
auto s = fromReader.readLine();
|
||||
if (s == "exit") {
|
||||
break;
|
||||
}
|
||||
if (!hasPrefix(s, "do ")) {
|
||||
fprintf(stderr, "worker error: received invalid command '%s'\n",
|
||||
s.data());
|
||||
abort();
|
||||
}
|
||||
auto path = json::parse(s.substr(3));
|
||||
auto attrPathS = attrPathJoin(path);
|
||||
|
||||
debug("worker process %d at '%s'", getpid(), path);
|
||||
|
||||
/* Evaluate it and send info back to the collector. */
|
||||
json reply = json{{"attr", attrPathS}, {"attrPath", path}};
|
||||
try {
|
||||
auto vTmp =
|
||||
findAlongAttrPath(*state, attrPathS, autoArgs, *vRoot).first;
|
||||
|
||||
auto v = state->allocValue();
|
||||
state->autoCallFunction(autoArgs, *vTmp, *v);
|
||||
|
||||
if (v->type() == nAttrs) {
|
||||
if (auto drvInfo = getDerivation(*state, *v, false)) {
|
||||
auto drv = Drv(attrPathS, *state, *drvInfo, myArgs);
|
||||
reply.update(drv);
|
||||
|
||||
/* Register the derivation as a GC root. !!! This
|
||||
registers roots for jobs that we may have already
|
||||
done. */
|
||||
if (myArgs.gcRootsDir != "") {
|
||||
Path root = myArgs.gcRootsDir + "/" +
|
||||
std::string(baseNameOf(drv.drvPath));
|
||||
if (!pathExists(root)) {
|
||||
auto localStore =
|
||||
state->store
|
||||
.dynamic_pointer_cast<LocalFSStore>();
|
||||
auto storePath =
|
||||
localStore->parseStorePath(drv.drvPath);
|
||||
localStore->addPermRoot(storePath, root);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
auto attrs = nlohmann::json::array();
|
||||
bool recurse =
|
||||
myArgs.forceRecurse ||
|
||||
path.size() == 0; // Dont require `recurseForDerivations
|
||||
// = true;` for top-level attrset
|
||||
|
||||
for (auto &i :
|
||||
v->attrs->lexicographicOrder(state->symbols)) {
|
||||
const std::string &name = state->symbols[i->name];
|
||||
attrs.push_back(name);
|
||||
|
||||
if (name == "recurseForDerivations" &&
|
||||
!myArgs.forceRecurse) {
|
||||
auto attrv =
|
||||
v->attrs->get(state->sRecurseForDerivations);
|
||||
recurse = state->forceBool(
|
||||
*attrv->value, attrv->pos,
|
||||
"while evaluating recurseForDerivations");
|
||||
}
|
||||
}
|
||||
if (recurse)
|
||||
reply["attrs"] = std::move(attrs);
|
||||
else
|
||||
reply["attrs"] = nlohmann::json::array();
|
||||
}
|
||||
} else {
|
||||
// We ignore everything that cannot be build
|
||||
reply["attrs"] = nlohmann::json::array();
|
||||
}
|
||||
} catch (EvalError &e) {
|
||||
auto err = e.info();
|
||||
std::ostringstream oss;
|
||||
showErrorInfo(oss, err, loggerSettings.showTrace.get());
|
||||
auto msg = oss.str();
|
||||
|
||||
// Transmits the error we got from the previous evaluation
|
||||
// in the JSON output.
|
||||
reply["error"] = filterANSIEscapes(msg, true);
|
||||
// Don't forget to print it into the STDERR log, this is
|
||||
// what's shown in the Hydra UI.
|
||||
printError(e.msg());
|
||||
}
|
||||
|
||||
if (tryWriteLine(to.get(), reply.dump()) < 0) {
|
||||
return; // main process died
|
||||
}
|
||||
|
||||
/* If our RSS exceeds the maximum, exit. The collector will
|
||||
start a new process. */
|
||||
struct rusage r;
|
||||
getrusage(RUSAGE_SELF, &r);
|
||||
if ((size_t)r.ru_maxrss > myArgs.maxMemorySize * 1024)
|
||||
break;
|
||||
}
|
||||
|
||||
if (tryWriteLine(to.get(), "restart") < 0) {
|
||||
return; // main process died
|
||||
};
|
||||
}
|
||||
|
||||
typedef std::function<void(ref<EvalState> state, Bindings &autoArgs,
|
||||
AutoCloseFD &to, AutoCloseFD &from)>
|
||||
AutoCloseFD &to, AutoCloseFD &from, MyArgs &args)>
|
||||
Processor;
|
||||
|
||||
/* Auto-cleanup of fork's process and fds. */
|
||||
|
@ -275,11 +58,11 @@ struct Proc {
|
|||
auto state = std::make_shared<EvalState>(
|
||||
myArgs.searchPath, openStore(*myArgs.evalStoreUrl));
|
||||
Bindings &autoArgs = *myArgs.getAutoArgs(*state);
|
||||
proc(ref<EvalState>(state), autoArgs, *to, *from);
|
||||
proc(ref<EvalState>(state), autoArgs, *to, *from, myArgs);
|
||||
} catch (Error &e) {
|
||||
nlohmann::json err;
|
||||
auto msg = e.msg();
|
||||
err["error"] = filterANSIEscapes(msg, true);
|
||||
err["error"] = nix::filterANSIEscapes(msg, true);
|
||||
printError(msg);
|
||||
if (tryWriteLine(to->get(), err.dump()) < 0) {
|
||||
return; // main process died
|
||||
|
|
173
src/worker.cc
Normal file
173
src/worker.cc
Normal file
|
@ -0,0 +1,173 @@
|
|||
#include "worker.hh"
|
||||
#include "drv.hh"
|
||||
#include "buffered-io.hh"
|
||||
|
||||
#include <nix/terminal.hh>
|
||||
#include <nix/attr-path.hh>
|
||||
#include <nix/local-fs-store.hh>
|
||||
#include <nix/installable-flake.hh>
|
||||
|
||||
#include <sys/resource.h>
|
||||
#include <nlohmann/json.hpp>
|
||||
|
||||
static nix::Value *releaseExprTopLevelValue(nix::EvalState &state,
|
||||
nix::Bindings &autoArgs,
|
||||
MyArgs &args) {
|
||||
nix::Value vTop;
|
||||
|
||||
if (args.fromArgs) {
|
||||
nix::Expr *e = state.parseExprFromString(
|
||||
args.releaseExpr, state.rootPath(nix::CanonPath::fromCwd()));
|
||||
state.eval(e, vTop);
|
||||
} else {
|
||||
state.evalFile(lookupFileArg(state, args.releaseExpr), vTop);
|
||||
}
|
||||
|
||||
auto vRoot = state.allocValue();
|
||||
|
||||
state.autoCallFunction(autoArgs, vTop, *vRoot);
|
||||
|
||||
return vRoot;
|
||||
}
|
||||
|
||||
static std::string attrPathJoin(nlohmann::json input) {
|
||||
return std::accumulate(input.begin(), input.end(), std::string(),
|
||||
[](std::string ss, std::string s) {
|
||||
// Escape token if containing dots
|
||||
if (s.find(".") != std::string::npos) {
|
||||
s = "\"" + s + "\"";
|
||||
}
|
||||
return ss.empty() ? s : ss + "." + s;
|
||||
});
|
||||
}
|
||||
|
||||
void worker(nix::ref<nix::EvalState> state, nix::Bindings &autoArgs,
|
||||
nix::AutoCloseFD &to, nix::AutoCloseFD &from, MyArgs &args) {
|
||||
|
||||
nix::Value *vRoot = [&]() {
|
||||
if (args.flake) {
|
||||
auto [flakeRef, fragment, outputSpec] =
|
||||
nix::parseFlakeRefWithFragmentAndExtendedOutputsSpec(
|
||||
args.releaseExpr, nix::absPath("."));
|
||||
nix::InstallableFlake flake{
|
||||
{}, state, std::move(flakeRef), fragment, outputSpec,
|
||||
{}, {}, args.lockFlags};
|
||||
|
||||
return flake.toValue(*state).first;
|
||||
} else {
|
||||
return releaseExprTopLevelValue(*state, autoArgs, args);
|
||||
}
|
||||
}();
|
||||
|
||||
LineReader fromReader(from.release());
|
||||
|
||||
while (true) {
|
||||
/* Wait for the collector to send us a job name. */
|
||||
if (tryWriteLine(to.get(), "next") < 0) {
|
||||
return; // main process died
|
||||
}
|
||||
|
||||
auto s = fromReader.readLine();
|
||||
if (s == "exit") {
|
||||
break;
|
||||
}
|
||||
if (!nix::hasPrefix(s, "do ")) {
|
||||
fprintf(stderr, "worker error: received invalid command '%s'\n",
|
||||
s.data());
|
||||
abort();
|
||||
}
|
||||
auto path = nlohmann::json::parse(s.substr(3));
|
||||
auto attrPathS = attrPathJoin(path);
|
||||
|
||||
/* Evaluate it and send info back to the collector. */
|
||||
nlohmann::json reply =
|
||||
nlohmann::json{{"attr", attrPathS}, {"attrPath", path}};
|
||||
try {
|
||||
auto vTmp =
|
||||
nix::findAlongAttrPath(*state, attrPathS, autoArgs, *vRoot)
|
||||
.first;
|
||||
|
||||
auto v = state->allocValue();
|
||||
state->autoCallFunction(autoArgs, *vTmp, *v);
|
||||
|
||||
if (v->type() == nix::nAttrs) {
|
||||
if (auto drvInfo = nix::getDerivation(*state, *v, false)) {
|
||||
auto drv = Drv(attrPathS, *state, *drvInfo, args);
|
||||
reply.update(drv);
|
||||
|
||||
/* Register the derivation as a GC root. !!! This
|
||||
registers roots for jobs that we may have already
|
||||
done. */
|
||||
if (args.gcRootsDir != "") {
|
||||
nix::Path root =
|
||||
args.gcRootsDir + "/" +
|
||||
std::string(nix::baseNameOf(drv.drvPath));
|
||||
if (!nix::pathExists(root)) {
|
||||
auto localStore =
|
||||
state->store
|
||||
.dynamic_pointer_cast<nix::LocalFSStore>();
|
||||
auto storePath =
|
||||
localStore->parseStorePath(drv.drvPath);
|
||||
localStore->addPermRoot(storePath, root);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
auto attrs = nlohmann::json::array();
|
||||
bool recurse =
|
||||
args.forceRecurse ||
|
||||
path.size() == 0; // Dont require `recurseForDerivations
|
||||
// = true;` for top-level attrset
|
||||
|
||||
for (auto &i :
|
||||
v->attrs->lexicographicOrder(state->symbols)) {
|
||||
const std::string &name = state->symbols[i->name];
|
||||
attrs.push_back(name);
|
||||
|
||||
if (name == "recurseForDerivations" &&
|
||||
!args.forceRecurse) {
|
||||
auto attrv =
|
||||
v->attrs->get(state->sRecurseForDerivations);
|
||||
recurse = state->forceBool(
|
||||
*attrv->value, attrv->pos,
|
||||
"while evaluating recurseForDerivations");
|
||||
}
|
||||
}
|
||||
if (recurse)
|
||||
reply["attrs"] = std::move(attrs);
|
||||
else
|
||||
reply["attrs"] = nlohmann::json::array();
|
||||
}
|
||||
} else {
|
||||
// We ignore everything that cannot be build
|
||||
reply["attrs"] = nlohmann::json::array();
|
||||
}
|
||||
} catch (nix::EvalError &e) {
|
||||
auto err = e.info();
|
||||
std::ostringstream oss;
|
||||
nix::showErrorInfo(oss, err, nix::loggerSettings.showTrace.get());
|
||||
auto msg = oss.str();
|
||||
|
||||
// Transmits the error we got from the previous evaluation
|
||||
// in the JSON output.
|
||||
reply["error"] = nix::filterANSIEscapes(msg, true);
|
||||
// Don't forget to print it into the STDERR log, this is
|
||||
// what's shown in the Hydra UI.
|
||||
fprintf(stderr, "%s\n", msg.c_str());
|
||||
}
|
||||
|
||||
if (tryWriteLine(to.get(), reply.dump()) < 0) {
|
||||
return; // main process died
|
||||
}
|
||||
|
||||
/* If our RSS exceeds the maximum, exit. The collector will
|
||||
start a new process. */
|
||||
struct rusage r;
|
||||
getrusage(RUSAGE_SELF, &r);
|
||||
if ((size_t)r.ru_maxrss > args.maxMemorySize * 1024)
|
||||
break;
|
||||
}
|
||||
|
||||
if (tryWriteLine(to.get(), "restart") < 0) {
|
||||
return; // main process died
|
||||
};
|
||||
}
|
9
src/worker.hh
Normal file
9
src/worker.hh
Normal file
|
@ -0,0 +1,9 @@
|
|||
#pragma once
|
||||
#include <nix/config.h>
|
||||
#include <nix/shared.hh>
|
||||
#include <nix/eval.hh>
|
||||
|
||||
#include "eval-args.hh"
|
||||
|
||||
void worker(nix::ref<nix::EvalState> state, nix::Bindings &autoArgs,
|
||||
nix::AutoCloseFD &to, nix::AutoCloseFD &from, MyArgs &args);
|
Loading…
Reference in a new issue