handle broken evaluation worker pipes on write

This commit is contained in:
Jörg Thalheim 2023-12-10 08:16:54 +01:00 committed by mergify[bot]
parent 36483b325c
commit 93972c0c18

View file

@ -299,6 +299,46 @@ std::string attrPathJoin(json input) {
return 0; return 0;
} }
class LineReader {
public:
LineReader(int fd) {
stream = fdopen(fd, "r");
if (!stream) {
throw Error("fdopen failed: %s", strerror(errno));
}
}
~LineReader() {
fclose(stream);
free(buffer);
}
LineReader(LineReader &&other) {
stream = other.stream;
other.stream = nullptr;
buffer = other.buffer;
other.buffer = nullptr;
len = other.len;
other.len = 0;
}
[[nodiscard]] std::string_view readLine() {
ssize_t read = getline(&buffer, &len, stream);
if (read == -1) {
return {}; // Return an empty string_view in case of error
}
// Remove trailing newline
return std::string_view(buffer, read - 1);
}
private:
FILE *stream = nullptr;
char *buffer = nullptr;
size_t len = 0;
};
static void worker(ref<EvalState> state, Bindings &autoArgs, AutoCloseFD &to, static void worker(ref<EvalState> state, Bindings &autoArgs, AutoCloseFD &to,
AutoCloseFD &from) { AutoCloseFD &from) {
@ -317,13 +357,15 @@ static void worker(ref<EvalState> state, Bindings &autoArgs, AutoCloseFD &to,
} }
}(); }();
LineReader fromReader(from.release());
while (true) { while (true) {
/* Wait for the collector to send us a job name. */ /* Wait for the collector to send us a job name. */
if (tryWriteLine(to.get(), "next") < 0) { if (tryWriteLine(to.get(), "next") < 0) {
return; // main process died return; // main process died
} }
auto s = readLine(from.get()); auto s = fromReader.readLine();
if (s == "exit") if (s == "exit")
break; break;
if (!hasPrefix(s, "do ")) if (!hasPrefix(s, "do "))
@ -478,19 +520,28 @@ struct State {
std::exception_ptr exc; std::exception_ptr exc;
}; };
void handleBrokenWorkerPipe(pid_t child) { void handleBrokenWorkerPipe(Proc &proc) {
while (1) { while (1) {
int rc = waitpid(child, nullptr, WNOHANG); int rc = waitpid(proc.pid, nullptr, WNOHANG);
if (rc == 0) { if (rc == 0) {
proc.pid = -1; // we already took the process status from Proc, no
// need to wait for it again to avoid error messages
throw Error("BUG: worker pipe closed but worker still running?"); throw Error("BUG: worker pipe closed but worker still running?");
} else if (rc == -1) { } else if (rc == -1) {
proc.pid = -1;
throw Error("BUG: waitpid waiting for worker failed: %s", throw Error("BUG: waitpid waiting for worker failed: %s",
strerror(errno)); strerror(errno));
} else { } else {
if (WIFEXITED(rc)) { if (WIFEXITED(rc)) {
proc.pid = -1;
throw Error("evaluation worker exited with %d", throw Error("evaluation worker exited with %d",
WEXITSTATUS(rc)); WEXITSTATUS(rc));
} else if (WIFSIGNALED(rc)) { } else if (WIFSIGNALED(rc)) {
proc.pid = -1;
if (WTERMSIG(rc) == SIGKILL) {
throw Error("evaluation worker killed by SIGKILL, maybe "
"memory limit reached?");
}
throw Error("evaluation worker killed by signal %d", throw Error("evaluation worker killed by signal %d",
WTERMSIG(rc)); WTERMSIG(rc));
} // else ignore WIFSTOPPED and WIFCONTINUED } // else ignore WIFSTOPPED and WIFCONTINUED
@ -503,20 +554,35 @@ std::function<void()> collector(Sync<State> &state_,
return [&]() { return [&]() {
try { try {
std::optional<std::unique_ptr<Proc>> proc_; std::optional<std::unique_ptr<Proc>> proc_;
std::optional<std::unique_ptr<LineReader>> fromReader_;
while (true) { while (true) {
if (!proc_.has_value()) {
auto proc = proc_.has_value() ? std::move(proc_.value()) proc_ = std::make_unique<Proc>(worker);
: std::make_unique<Proc>(worker); fromReader_ = std::make_unique<LineReader>(
proc_.value()->from.release());
}
auto proc = std::move(proc_.value());
auto fromReader = std::move(fromReader_.value());
/* Check whether the existing worker process is still there. */ /* Check whether the existing worker process is still there. */
auto s = readLine(proc->from.get()); auto s = fromReader->readLine();
if (s == "restart") { if (s == "") {
handleBrokenWorkerPipe(*proc.get());
} else if (s == "restart") {
proc_ = std::nullopt; proc_ = std::nullopt;
fromReader_ = std::nullopt;
continue; continue;
} else if (s != "next") { } else if (s != "next") {
auto json = json::parse(s); try {
throw Error("worker error: %s", (std::string)json["error"]); auto json = json::parse(s);
throw Error("worker error: %s",
(std::string)json["error"]);
} catch (const json::exception &e) {
throw Error(
"Received invalid JSON from worker: %s '%s'",
e.what(), s);
}
} }
/* Wait for a job name to become available. */ /* Wait for a job name to become available. */
@ -528,7 +594,7 @@ std::function<void()> collector(Sync<State> &state_,
if ((state->todo.empty() && state->active.empty()) || if ((state->todo.empty() && state->active.empty()) ||
state->exc) { state->exc) {
if (tryWriteLine(proc->to.get(), "exit") < 0) { if (tryWriteLine(proc->to.get(), "exit") < 0) {
handleBrokenWorkerPipe(proc->pid); handleBrokenWorkerPipe(*proc.get());
} }
return; return;
} }
@ -543,12 +609,25 @@ std::function<void()> collector(Sync<State> &state_,
/* Tell the worker to evaluate it. */ /* Tell the worker to evaluate it. */
if (tryWriteLine(proc->to.get(), "do " + attrPath.dump()) < 0) { if (tryWriteLine(proc->to.get(), "do " + attrPath.dump()) < 0) {
handleBrokenWorkerPipe(proc->pid); handleBrokenWorkerPipe(*proc.get());
} }
/* Wait for the response. */ /* Wait for the response. */
auto respString = readLine(proc->from.get()); auto respString = fromReader->readLine();
auto response = json::parse(respString); if (respString == "") {
handleBrokenWorkerPipe(*proc.get());
}
json response;
try {
response = json::parse(respString);
if (response.find("error") != response.end()) {
throw Error("worker error: %s",
(std::string)response["error"]);
}
} catch (const json::exception &e) {
throw Error("Received invalid JSON from worker: %s '%s'",
e.what(), respString);
}
/* Handle the response. */ /* Handle the response. */
std::vector<json> newAttrs; std::vector<json> newAttrs;
@ -564,6 +643,7 @@ std::function<void()> collector(Sync<State> &state_,
} }
proc_ = std::move(proc); proc_ = std::move(proc);
fromReader_ = std::move(fromReader);
/* Add newly discovered job names to the queue. */ /* Add newly discovered job names to the queue. */
{ {