handle broken evaluation worker pipes more gracefully

writeLine will throw a SysError exception, which obfuscates out-of-memory events where the eval worker is killed by the OS.
readLine is suffering from the same problem and will be handled in a subsequent commit.
This commit is contained in:
Jörg Thalheim 2023-12-09 17:17:01 +01:00 committed by mergify[bot]
parent 40ad828088
commit 5ad4e7266f

View file

@ -2,7 +2,6 @@
#include <iostream> #include <iostream>
#include <thread> #include <thread>
#include <filesystem> #include <filesystem>
#include <nix/eval-settings.hh> #include <nix/eval-settings.hh>
#include <nix/config.h> #include <nix/config.h>
#include <nix/shared.hh> #include <nix/shared.hh>
@ -284,6 +283,22 @@ std::string attrPathJoin(json input) {
}); });
} }
[[nodiscard]] static int tryWriteLine(int fd, std::string s) {
s += "\n";
std::string_view sv{s};
while (!sv.empty()) {
checkInterrupt();
ssize_t res = write(fd, sv.data(), sv.size());
if (res == -1 && errno != EINTR) {
return -errno;
}
if (res > 0) {
sv.remove_prefix(res);
}
}
return 0;
}
static void worker(ref<EvalState> state, Bindings &autoArgs, AutoCloseFD &to, static void worker(ref<EvalState> state, Bindings &autoArgs, AutoCloseFD &to,
AutoCloseFD &from) { AutoCloseFD &from) {
@ -304,7 +319,9 @@ static void worker(ref<EvalState> state, Bindings &autoArgs, AutoCloseFD &to,
while (true) { while (true) {
/* Wait for the collector to send us a job name. */ /* Wait for the collector to send us a job name. */
writeLine(to.get(), "next"); if (tryWriteLine(to.get(), "next") < 0) {
return; // main process died
}
auto s = readLine(from.get()); auto s = readLine(from.get());
if (s == "exit") if (s == "exit")
@ -389,7 +406,9 @@ static void worker(ref<EvalState> state, Bindings &autoArgs, AutoCloseFD &to,
printError(e.msg()); printError(e.msg());
} }
writeLine(to.get(), reply.dump()); if (tryWriteLine(to.get(), reply.dump()) < 0) {
return; // main process died
}
/* If our RSS exceeds the maximum, exit. The collector will /* If our RSS exceeds the maximum, exit. The collector will
start a new process. */ start a new process. */
@ -399,7 +418,9 @@ static void worker(ref<EvalState> state, Bindings &autoArgs, AutoCloseFD &to,
break; break;
} }
writeLine(to.get(), "restart"); if (tryWriteLine(to.get(), "restart") < 0) {
return; // main process died
};
} }
typedef std::function<void(ref<EvalState> state, Bindings &autoArgs, typedef std::function<void(ref<EvalState> state, Bindings &autoArgs,
@ -431,10 +452,14 @@ struct Proc {
auto msg = e.msg(); auto msg = e.msg();
err["error"] = filterANSIEscapes(msg, true); err["error"] = filterANSIEscapes(msg, true);
printError(msg); printError(msg);
writeLine(to->get(), err.dump()); if (tryWriteLine(to->get(), err.dump()) < 0) {
return; // main process died
};
// Don't forget to print it into the STDERR log, this is // Don't forget to print it into the STDERR log, this is
// what's shown in the Hydra UI. // what's shown in the Hydra UI.
writeLine(to->get(), "restart"); if (tryWriteLine(to->get(), "restart") < 0) {
return; // main process died
}
} }
}, },
ProcessOptions{.allowVfork = false}); ProcessOptions{.allowVfork = false});
@ -453,6 +478,26 @@ struct State {
std::exception_ptr exc; std::exception_ptr exc;
}; };
void handleBrokenWorkerPipe(pid_t child) {
while (1) {
int rc = waitpid(child, nullptr, WNOHANG);
if (rc == 0) {
throw Error("BUG: worker pipe closed but worker still running?");
} else if (rc == -1) {
throw Error("BUG: waitpid waiting for worker failed: %s",
strerror(errno));
} else {
if (WIFEXITED(rc)) {
throw Error("evaluation worker exited with %d",
WEXITSTATUS(rc));
} else if (WIFSIGNALED(rc)) {
throw Error("evaluation worker killed by signal %d",
WTERMSIG(rc));
} // else ignore WIFSTOPPED and WIFCONTINUED
}
}
}
std::function<void()> collector(Sync<State> &state_, std::function<void()> collector(Sync<State> &state_,
std::condition_variable &wakeup) { std::condition_variable &wakeup) {
return [&]() { return [&]() {
@ -482,7 +527,9 @@ std::function<void()> collector(Sync<State> &state_,
auto state(state_.lock()); auto state(state_.lock());
if ((state->todo.empty() && state->active.empty()) || if ((state->todo.empty() && state->active.empty()) ||
state->exc) { state->exc) {
writeLine(proc->to.get(), "exit"); if (tryWriteLine(proc->to.get(), "exit") < 0) {
handleBrokenWorkerPipe(proc->pid);
}
return; return;
} }
if (!state->todo.empty()) { if (!state->todo.empty()) {
@ -495,7 +542,9 @@ std::function<void()> collector(Sync<State> &state_,
} }
/* Tell the worker to evaluate it. */ /* Tell the worker to evaluate it. */
writeLine(proc->to.get(), "do " + attrPath.dump()); if (tryWriteLine(proc->to.get(), "do " + attrPath.dump()) < 0) {
handleBrokenWorkerPipe(proc->pid);
}
/* Wait for the response. */ /* Wait for the response. */
auto respString = readLine(proc->from.get()); auto respString = readLine(proc->from.get());