Use Proc struct to manage forked processes.

Cleans up zombie processes.
This commit is contained in:
John Soo 2022-04-21 10:46:38 -07:00
parent 9d4c256fa0
commit a27faabd0a
No known key found for this signature in database
GPG key ID: D8A148F8CE4DDBC2

View file

@ -177,8 +177,7 @@ static void worker(
EvalState & state, EvalState & state,
Bindings & autoArgs, Bindings & autoArgs,
AutoCloseFD & to, AutoCloseFD & to,
AutoCloseFD & from, AutoCloseFD & from)
const Path &gcRootsDir)
{ {
auto vRoot = topLevelValue(state, autoArgs); auto vRoot = topLevelValue(state, autoArgs);
@ -243,8 +242,8 @@ static void worker(
/* Register the derivation as a GC root. !!! This /* Register the derivation as a GC root. !!! This
registers roots for jobs that we may have already registers roots for jobs that we may have already
done. */ done. */
if (gcRootsDir != "") { if (myArgs.gcRootsDir != "") {
Path root = gcRootsDir + "/" + std::string(baseNameOf(drvPath)); Path root = myArgs.gcRootsDir + "/" + std::string(baseNameOf(drvPath));
if (!pathExists(root)) if (!pathExists(root))
localStore->addPermRoot(storePath, root); localStore->addPermRoot(storePath, root);
} }
@ -391,47 +390,18 @@ int main(int argc, char * * argv)
auto handler = [&]() auto handler = [&]()
{ {
try { try {
pid_t pid = -1; std::optional<std::unique_ptr<Proc>> proc_;
AutoCloseFD from, to;
while (true) { while (true) {
/* Start a new worker process if necessary. */ auto proc = proc_.has_value()
if (pid == -1) { ? std::move(proc_.value())
Pipe toPipe, fromPipe; : std::make_unique<Proc>(worker);
toPipe.create();
fromPipe.create();
pid = startProcess(
[&,
to{std::make_shared<AutoCloseFD>(std::move(fromPipe.writeSide))},
from{std::make_shared<AutoCloseFD>(std::move(toPipe.readSide))}
]()
{
try {
EvalState state(myArgs.searchPath, openStore());
Bindings & autoArgs = *myArgs.getAutoArgs(state);
worker(state, autoArgs, *to, *from, myArgs.gcRootsDir);
} catch (Error & e) {
nlohmann::json err;
auto msg = e.msg();
err["error"] = filterANSIEscapes(msg, true);
printError(msg);
writeLine(to->get(), err.dump());
// Don't forget to print it into the STDERR log, this is
// what's shown in the Hydra UI.
writeLine(to->get(), "restart");
}
},
ProcessOptions { .allowVfork = false });
from = std::move(fromPipe.readSide);
to = std::move(toPipe.writeSide);
debug("created worker process %d", pid);
}
/* Check whether the existing worker process is still there. */ /* Check whether the existing worker process is still there. */
auto s = readLine(from.get()); auto s = readLine(proc->from.get());
if (s == "restart") { if (s == "restart") {
pid = -1; proc_ = std::nullopt;
continue; continue;
} else if (s != "next") { } else if (s != "next") {
auto json = nlohmann::json::parse(s); auto json = nlohmann::json::parse(s);
@ -445,7 +415,7 @@ int main(int argc, char * * argv)
checkInterrupt(); checkInterrupt();
auto state(state_.lock()); auto state(state_.lock());
if ((state->todo.empty() && state->active.empty()) || state->exc) { if ((state->todo.empty() && state->active.empty()) || state->exc) {
writeLine(to.get(), "exit"); writeLine(proc->to.get(), "exit");
return; return;
} }
if (!state->todo.empty()) { if (!state->todo.empty()) {
@ -458,10 +428,10 @@ int main(int argc, char * * argv)
} }
/* Tell the worker to evaluate it. */ /* Tell the worker to evaluate it. */
writeLine(to.get(), "do " + attrPath); writeLine(proc->to.get(), "do " + attrPath);
/* Wait for the response. */ /* Wait for the response. */
auto respString = readLine(from.get()); auto respString = readLine(proc->from.get());
auto response = nlohmann::json::parse(respString); auto response = nlohmann::json::parse(respString);
/* Handle the response. */ /* Handle the response. */
@ -476,6 +446,8 @@ int main(int argc, char * * argv)
std::cout << respString << "\n" << std::flush; std::cout << respString << "\n" << std::flush;
} }
proc_ = std::move(proc);
/* Add newly discovered job names to the queue. */ /* Add newly discovered job names to the queue. */
{ {
auto state(state_.lock()); auto state(state_.lock());