From a27faabd0a3fc0a693433fa5b277587127c14fb3 Mon Sep 17 00:00:00 2001 From: John Soo Date: Thu, 21 Apr 2022 10:46:38 -0700 Subject: [PATCH] Use Proc struct to manage forked processes. Cleans up zombie processes. --- src/nix-eval-jobs.cc | 56 +++++++++++--------------------------------- 1 file changed, 14 insertions(+), 42 deletions(-) diff --git a/src/nix-eval-jobs.cc b/src/nix-eval-jobs.cc index 045577b..d71cf8d 100644 --- a/src/nix-eval-jobs.cc +++ b/src/nix-eval-jobs.cc @@ -177,8 +177,7 @@ static void worker( EvalState & state, Bindings & autoArgs, AutoCloseFD & to, - AutoCloseFD & from, - const Path &gcRootsDir) + AutoCloseFD & from) { auto vRoot = topLevelValue(state, autoArgs); @@ -243,8 +242,8 @@ static void worker( /* Register the derivation as a GC root. !!! This registers roots for jobs that we may have already done. */ - if (gcRootsDir != "") { - Path root = gcRootsDir + "/" + std::string(baseNameOf(drvPath)); + if (myArgs.gcRootsDir != "") { + Path root = myArgs.gcRootsDir + "/" + std::string(baseNameOf(drvPath)); if (!pathExists(root)) localStore->addPermRoot(storePath, root); } @@ -391,47 +390,18 @@ int main(int argc, char * * argv) auto handler = [&]() { try { - pid_t pid = -1; - AutoCloseFD from, to; + std::optional> proc_; while (true) { - /* Start a new worker process if necessary. */ - if (pid == -1) { - Pipe toPipe, fromPipe; - toPipe.create(); - fromPipe.create(); - pid = startProcess( - [&, - to{std::make_shared(std::move(fromPipe.writeSide))}, - from{std::make_shared(std::move(toPipe.readSide))} - ]() - { - try { - EvalState state(myArgs.searchPath, openStore()); - Bindings & autoArgs = *myArgs.getAutoArgs(state); - worker(state, autoArgs, *to, *from, myArgs.gcRootsDir); - } catch (Error & e) { - nlohmann::json err; - auto msg = e.msg(); - err["error"] = filterANSIEscapes(msg, true); - printError(msg); - writeLine(to->get(), err.dump()); - // Don't forget to print it into the STDERR log, this is - // what's shown in the Hydra UI. - writeLine(to->get(), "restart"); - } - }, - ProcessOptions { .allowVfork = false }); - from = std::move(fromPipe.readSide); - to = std::move(toPipe.writeSide); - debug("created worker process %d", pid); - } + auto proc = proc_.has_value() + ? std::move(proc_.value()) + : std::make_unique(worker); /* Check whether the existing worker process is still there. */ - auto s = readLine(from.get()); + auto s = readLine(proc->from.get()); if (s == "restart") { - pid = -1; + proc_ = std::nullopt; continue; } else if (s != "next") { auto json = nlohmann::json::parse(s); @@ -445,7 +415,7 @@ int main(int argc, char * * argv) checkInterrupt(); auto state(state_.lock()); if ((state->todo.empty() && state->active.empty()) || state->exc) { - writeLine(to.get(), "exit"); + writeLine(proc->to.get(), "exit"); return; } if (!state->todo.empty()) { @@ -458,10 +428,10 @@ int main(int argc, char * * argv) } /* Tell the worker to evaluate it. */ - writeLine(to.get(), "do " + attrPath); + writeLine(proc->to.get(), "do " + attrPath); /* Wait for the response. */ - auto respString = readLine(from.get()); + auto respString = readLine(proc->from.get()); auto response = nlohmann::json::parse(respString); /* Handle the response. */ @@ -476,6 +446,8 @@ int main(int argc, char * * argv) std::cout << respString << "\n" << std::flush; } + proc_ = std::move(proc); + /* Add newly discovered job names to the queue. */ { auto state(state_.lock());