diff --git a/src/nix-eval-jobs.cc b/src/nix-eval-jobs.cc index 56b6c49..b766dee 100644 --- a/src/nix-eval-jobs.cc +++ b/src/nix-eval-jobs.cc @@ -2,7 +2,6 @@ #include #include #include - #include #include #include @@ -284,6 +283,22 @@ std::string attrPathJoin(json input) { }); } +[[nodiscard]] static int tryWriteLine(int fd, std::string s) { + s += "\n"; + std::string_view sv{s}; + while (!sv.empty()) { + checkInterrupt(); + ssize_t res = write(fd, sv.data(), sv.size()); + if (res == -1 && errno != EINTR) { + return -errno; + } + if (res > 0) { + sv.remove_prefix(res); + } + } + return 0; +} + static void worker(ref state, Bindings &autoArgs, AutoCloseFD &to, AutoCloseFD &from) { @@ -304,7 +319,9 @@ static void worker(ref state, Bindings &autoArgs, AutoCloseFD &to, while (true) { /* Wait for the collector to send us a job name. */ - writeLine(to.get(), "next"); + if (tryWriteLine(to.get(), "next") < 0) { + return; // main process died + } auto s = readLine(from.get()); if (s == "exit") @@ -389,7 +406,9 @@ static void worker(ref state, Bindings &autoArgs, AutoCloseFD &to, printError(e.msg()); } - writeLine(to.get(), reply.dump()); + if (tryWriteLine(to.get(), reply.dump()) < 0) { + return; // main process died + } /* If our RSS exceeds the maximum, exit. The collector will start a new process. */ @@ -399,7 +418,9 @@ static void worker(ref state, Bindings &autoArgs, AutoCloseFD &to, break; } - writeLine(to.get(), "restart"); + if (tryWriteLine(to.get(), "restart") < 0) { + return; // main process died + }; } typedef std::function state, Bindings &autoArgs, @@ -431,10 +452,14 @@ struct Proc { auto msg = e.msg(); err["error"] = filterANSIEscapes(msg, true); printError(msg); - writeLine(to->get(), err.dump()); + if (tryWriteLine(to->get(), err.dump()) < 0) { + return; // main process died + }; // Don't forget to print it into the STDERR log, this is // what's shown in the Hydra UI. - writeLine(to->get(), "restart"); + if (tryWriteLine(to->get(), "restart") < 0) { + return; // main process died + } } }, ProcessOptions{.allowVfork = false}); @@ -453,6 +478,26 @@ struct State { std::exception_ptr exc; }; +void handleBrokenWorkerPipe(pid_t child) { + while (1) { + int rc = waitpid(child, nullptr, WNOHANG); + if (rc == 0) { + throw Error("BUG: worker pipe closed but worker still running?"); + } else if (rc == -1) { + throw Error("BUG: waitpid waiting for worker failed: %s", + strerror(errno)); + } else { + if (WIFEXITED(rc)) { + throw Error("evaluation worker exited with %d", + WEXITSTATUS(rc)); + } else if (WIFSIGNALED(rc)) { + throw Error("evaluation worker killed by signal %d", + WTERMSIG(rc)); + } // else ignore WIFSTOPPED and WIFCONTINUED + } + } +} + std::function collector(Sync &state_, std::condition_variable &wakeup) { return [&]() { @@ -482,7 +527,9 @@ std::function collector(Sync &state_, auto state(state_.lock()); if ((state->todo.empty() && state->active.empty()) || state->exc) { - writeLine(proc->to.get(), "exit"); + if (tryWriteLine(proc->to.get(), "exit") < 0) { + handleBrokenWorkerPipe(proc->pid); + } return; } if (!state->todo.empty()) { @@ -495,7 +542,9 @@ std::function collector(Sync &state_, } /* Tell the worker to evaluate it. */ - writeLine(proc->to.get(), "do " + attrPath.dump()); + if (tryWriteLine(proc->to.get(), "do " + attrPath.dump()) < 0) { + handleBrokenWorkerPipe(proc->pid); + } /* Wait for the response. */ auto respString = readLine(proc->from.get());