From 93972c0c1887766c8bc4419f29eb42c5caa398b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Thalheim?= Date: Sun, 10 Dec 2023 08:16:54 +0100 Subject: [PATCH] handle broken evaluation worker pipes on write --- src/nix-eval-jobs.cc | 108 +++++++++++++++++++++++++++++++++++++------ 1 file changed, 94 insertions(+), 14 deletions(-) diff --git a/src/nix-eval-jobs.cc b/src/nix-eval-jobs.cc index b766dee..e2e78d7 100644 --- a/src/nix-eval-jobs.cc +++ b/src/nix-eval-jobs.cc @@ -299,6 +299,46 @@ std::string attrPathJoin(json input) { return 0; } +class LineReader { + public: + LineReader(int fd) { + stream = fdopen(fd, "r"); + if (!stream) { + throw Error("fdopen failed: %s", strerror(errno)); + } + } + + ~LineReader() { + fclose(stream); + free(buffer); + } + + LineReader(LineReader &&other) { + stream = other.stream; + other.stream = nullptr; + buffer = other.buffer; + other.buffer = nullptr; + len = other.len; + other.len = 0; + } + + [[nodiscard]] std::string_view readLine() { + ssize_t read = getline(&buffer, &len, stream); + + if (read == -1) { + return {}; // Return an empty string_view in case of error + } + + // Remove trailing newline + return std::string_view(buffer, read - 1); + } + + private: + FILE *stream = nullptr; + char *buffer = nullptr; + size_t len = 0; +}; + static void worker(ref state, Bindings &autoArgs, AutoCloseFD &to, AutoCloseFD &from) { @@ -317,13 +357,15 @@ static void worker(ref state, Bindings &autoArgs, AutoCloseFD &to, } }(); + LineReader fromReader(from.release()); + while (true) { /* Wait for the collector to send us a job name. */ if (tryWriteLine(to.get(), "next") < 0) { return; // main process died } - auto s = readLine(from.get()); + auto s = fromReader.readLine(); if (s == "exit") break; if (!hasPrefix(s, "do ")) @@ -478,19 +520,28 @@ struct State { std::exception_ptr exc; }; -void handleBrokenWorkerPipe(pid_t child) { +void handleBrokenWorkerPipe(Proc &proc) { while (1) { - int rc = waitpid(child, nullptr, WNOHANG); + int rc = waitpid(proc.pid, nullptr, WNOHANG); if (rc == 0) { + proc.pid = -1; // we already took the process status from Proc, no + // need to wait for it again to avoid error messages throw Error("BUG: worker pipe closed but worker still running?"); } else if (rc == -1) { + proc.pid = -1; throw Error("BUG: waitpid waiting for worker failed: %s", strerror(errno)); } else { if (WIFEXITED(rc)) { + proc.pid = -1; throw Error("evaluation worker exited with %d", WEXITSTATUS(rc)); } else if (WIFSIGNALED(rc)) { + proc.pid = -1; + if (WTERMSIG(rc) == SIGKILL) { + throw Error("evaluation worker killed by SIGKILL, maybe " + "memory limit reached?"); + } throw Error("evaluation worker killed by signal %d", WTERMSIG(rc)); } // else ignore WIFSTOPPED and WIFCONTINUED @@ -503,20 +554,35 @@ std::function collector(Sync &state_, return [&]() { try { std::optional> proc_; + std::optional> fromReader_; while (true) { - - auto proc = proc_.has_value() ? std::move(proc_.value()) - : std::make_unique(worker); + if (!proc_.has_value()) { + proc_ = std::make_unique(worker); + fromReader_ = std::make_unique( + proc_.value()->from.release()); + } + auto proc = std::move(proc_.value()); + auto fromReader = std::move(fromReader_.value()); /* Check whether the existing worker process is still there. */ - auto s = readLine(proc->from.get()); - if (s == "restart") { + auto s = fromReader->readLine(); + if (s == "") { + handleBrokenWorkerPipe(*proc.get()); + } else if (s == "restart") { proc_ = std::nullopt; + fromReader_ = std::nullopt; continue; } else if (s != "next") { - auto json = json::parse(s); - throw Error("worker error: %s", (std::string)json["error"]); + try { + auto json = json::parse(s); + throw Error("worker error: %s", + (std::string)json["error"]); + } catch (const json::exception &e) { + throw Error( + "Received invalid JSON from worker: %s '%s'", + e.what(), s); + } } /* Wait for a job name to become available. */ @@ -528,7 +594,7 @@ std::function collector(Sync &state_, if ((state->todo.empty() && state->active.empty()) || state->exc) { if (tryWriteLine(proc->to.get(), "exit") < 0) { - handleBrokenWorkerPipe(proc->pid); + handleBrokenWorkerPipe(*proc.get()); } return; } @@ -543,12 +609,25 @@ std::function collector(Sync &state_, /* Tell the worker to evaluate it. */ if (tryWriteLine(proc->to.get(), "do " + attrPath.dump()) < 0) { - handleBrokenWorkerPipe(proc->pid); + handleBrokenWorkerPipe(*proc.get()); } /* Wait for the response. */ - auto respString = readLine(proc->from.get()); - auto response = json::parse(respString); + auto respString = fromReader->readLine(); + if (respString == "") { + handleBrokenWorkerPipe(*proc.get()); + } + json response; + try { + response = json::parse(respString); + if (response.find("error") != response.end()) { + throw Error("worker error: %s", + (std::string)response["error"]); + } + } catch (const json::exception &e) { + throw Error("Received invalid JSON from worker: %s '%s'", + e.what(), respString); + } /* Handle the response. */ std::vector newAttrs; @@ -564,6 +643,7 @@ std::function collector(Sync &state_, } proc_ = std::move(proc); + fromReader_ = std::move(fromReader); /* Add newly discovered job names to the queue. */ {