Merge pull request #55 from jsoo1/kill-zombies

Kill zombies.
This commit is contained in:
adisbladis 2022-04-22 14:33:16 +07:00 committed by GitHub
commit 5f9d57e9b1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -177,13 +177,12 @@ static void worker(
EvalState & state, EvalState & state,
Bindings & autoArgs, Bindings & autoArgs,
AutoCloseFD & to, AutoCloseFD & to,
AutoCloseFD & from, AutoCloseFD & from)
const Path &gcRootsDir)
{ {
auto vRoot = topLevelValue(state, autoArgs); auto vRoot = topLevelValue(state, autoArgs);
while (true) { while (true) {
/* Wait for the master to send us a job name. */ /* Wait for the collector to send us a job name. */
writeLine(to.get(), "next"); writeLine(to.get(), "next");
auto s = readLine(from.get()); auto s = readLine(from.get());
@ -193,7 +192,7 @@ static void worker(
debug("worker process %d at '%s'", getpid(), attrPath); debug("worker process %d at '%s'", getpid(), attrPath);
/* Evaluate it and send info back to the master. */ /* Evaluate it and send info back to the collector. */
nlohmann::json reply; nlohmann::json reply;
reply["attr"] = attrPath; reply["attr"] = attrPath;
@ -245,8 +244,8 @@ static void worker(
/* Register the derivation as a GC root. !!! This /* Register the derivation as a GC root. !!! This
registers roots for jobs that we may have already registers roots for jobs that we may have already
done. */ done. */
if (gcRootsDir != "") { if (myArgs.gcRootsDir != "") {
Path root = gcRootsDir + "/" + std::string(baseNameOf(drvPath)); Path root = myArgs.gcRootsDir + "/" + std::string(baseNameOf(drvPath));
if (!pathExists(root)) if (!pathExists(root))
localStore->addPermRoot(storePath, root); localStore->addPermRoot(storePath, root);
} }
@ -290,7 +289,7 @@ static void worker(
writeLine(to.get(), reply.dump()); writeLine(to.get(), reply.dump());
/* If our RSS exceeds the maximum, exit. The master will /* If our RSS exceeds the maximum, exit. The collector will
start a new process. */ start a new process. */
struct rusage r; struct rusage r;
getrusage(RUSAGE_SELF, &r); getrusage(RUSAGE_SELF, &r);
@ -300,6 +299,135 @@ static void worker(
writeLine(to.get(), "restart"); writeLine(to.get(), "restart");
} }
typedef std::function<void(EvalState & state, Bindings & autoArgs, AutoCloseFD & to, AutoCloseFD & from)>
Processor;
/* Auto-cleanup of fork's process and fds. */
struct Proc {
AutoCloseFD to, from;
Pid pid;
Proc(const Processor & proc) {
Pipe toPipe, fromPipe;
toPipe.create();
fromPipe.create();
auto p = startProcess(
[&,
to{std::make_shared<AutoCloseFD>(std::move(fromPipe.writeSide))},
from{std::make_shared<AutoCloseFD>(std::move(toPipe.readSide))}
]()
{
debug("created worker process %d", getpid());
try {
EvalState state(myArgs.searchPath, openStore());
Bindings & autoArgs = *myArgs.getAutoArgs(state);
proc(state, autoArgs, *to, *from);
} catch (Error & e) {
nlohmann::json err;
auto msg = e.msg();
err["error"] = filterANSIEscapes(msg, true);
printError(msg);
writeLine(to->get(), err.dump());
// Don't forget to print it into the STDERR log, this is
// what's shown in the Hydra UI.
writeLine(to->get(), "restart");
}
},
ProcessOptions { .allowVfork = false });
to = std::move(toPipe.writeSide);
from = std::move(fromPipe.readSide);
pid = p;
}
~Proc() { }
};
struct State
{
std::set<std::string> todo{""};
std::set<std::string> active;
std::exception_ptr exc;
};
std::function<void()> collector(Sync<State> & state_, std::condition_variable & wakeup) {
return [&]() {
try {
std::optional<std::unique_ptr<Proc>> proc_;
while (true) {
auto proc = proc_.has_value()
? std::move(proc_.value())
: std::make_unique<Proc>(worker);
/* Check whether the existing worker process is still there. */
auto s = readLine(proc->from.get());
if (s == "restart") {
proc_ = std::nullopt;
continue;
} else if (s != "next") {
auto json = nlohmann::json::parse(s);
throw Error("worker error: %s", (std::string) json["error"]);
}
/* Wait for a job name to become available. */
std::string attrPath;
while (true) {
checkInterrupt();
auto state(state_.lock());
if ((state->todo.empty() && state->active.empty()) || state->exc) {
writeLine(proc->to.get(), "exit");
return;
}
if (!state->todo.empty()) {
attrPath = *state->todo.begin();
state->todo.erase(state->todo.begin());
state->active.insert(attrPath);
break;
} else
state.wait(wakeup);
}
/* Tell the worker to evaluate it. */
writeLine(proc->to.get(), "do " + attrPath);
/* Wait for the response. */
auto respString = readLine(proc->from.get());
auto response = nlohmann::json::parse(respString);
/* Handle the response. */
StringSet newAttrs;
if (response.find("attrs") != response.end()) {
for (auto & i : response["attrs"]) {
auto s = (attrPath.empty() ? "" : attrPath + ".") + (std::string) i;
newAttrs.insert(s);
}
} else {
auto state(state_.lock());
std::cout << respString << "\n" << std::flush;
}
proc_ = std::move(proc);
/* Add newly discovered job names to the queue. */
{
auto state(state_.lock());
state->active.erase(attrPath);
for (auto & s : newAttrs)
state->todo.insert(s);
wakeup.notify_all();
}
}
} catch (...) {
auto state(state_.lock());
state->exc = std::current_exception();
wakeup.notify_all();
}
};
}
int main(int argc, char * * argv) int main(int argc, char * * argv)
{ {
/* Prevent undeclared dependencies in the evaluation via /* Prevent undeclared dependencies in the evaluation via
@ -334,125 +462,13 @@ int main(int argc, char * * argv)
loggerSettings.showTrace.assign(true); loggerSettings.showTrace.assign(true);
} }
struct State
{
std::set<std::string> todo{""};
std::set<std::string> active;
std::exception_ptr exc;
};
std::condition_variable wakeup;
Sync<State> state_; Sync<State> state_;
/* Start a handler thread per worker process. */ /* Start a collector thread per worker process. */
auto handler = [&]()
{
try {
pid_t pid = -1;
AutoCloseFD from, to;
while (true) {
/* Start a new worker process if necessary. */
if (pid == -1) {
Pipe toPipe, fromPipe;
toPipe.create();
fromPipe.create();
pid = startProcess(
[&,
to{std::make_shared<AutoCloseFD>(std::move(fromPipe.writeSide))},
from{std::make_shared<AutoCloseFD>(std::move(toPipe.readSide))}
]()
{
try {
EvalState state(myArgs.searchPath, openStore());
Bindings & autoArgs = *myArgs.getAutoArgs(state);
worker(state, autoArgs, *to, *from, myArgs.gcRootsDir);
} catch (Error & e) {
nlohmann::json err;
auto msg = e.msg();
err["error"] = filterANSIEscapes(msg, true);
printError(msg);
writeLine(to->get(), err.dump());
// Don't forget to print it into the STDERR log, this is
// what's shown in the Hydra UI.
writeLine(to->get(), "restart");
}
},
ProcessOptions { .allowVfork = false });
from = std::move(fromPipe.readSide);
to = std::move(toPipe.writeSide);
debug("created worker process %d", pid);
}
/* Check whether the existing worker process is still there. */
auto s = readLine(from.get());
if (s == "restart") {
pid = -1;
continue;
} else if (s != "next") {
auto json = nlohmann::json::parse(s);
throw Error("worker error: %s", (std::string) json["error"]);
}
/* Wait for a job name to become available. */
std::string attrPath;
while (true) {
checkInterrupt();
auto state(state_.lock());
if ((state->todo.empty() && state->active.empty()) || state->exc) {
writeLine(to.get(), "exit");
return;
}
if (!state->todo.empty()) {
attrPath = *state->todo.begin();
state->todo.erase(state->todo.begin());
state->active.insert(attrPath);
break;
} else
state.wait(wakeup);
}
/* Tell the worker to evaluate it. */
writeLine(to.get(), "do " + attrPath);
/* Wait for the response. */
auto respString = readLine(from.get());
auto response = nlohmann::json::parse(respString);
/* Handle the response. */
StringSet newAttrs;
if (response.find("attrs") != response.end()) {
for (auto & i : response["attrs"]) {
auto s = (attrPath.empty() ? "" : attrPath + ".") + (std::string) i;
newAttrs.insert(s);
}
} else {
auto state(state_.lock());
std::cout << respString << "\n" << std::flush;
}
/* Add newly discovered job names to the queue. */
{
auto state(state_.lock());
state->active.erase(attrPath);
for (auto & s : newAttrs)
state->todo.insert(s);
wakeup.notify_all();
}
}
} catch (...) {
auto state(state_.lock());
state->exc = std::current_exception();
wakeup.notify_all();
}
};
std::vector<std::thread> threads; std::vector<std::thread> threads;
std::condition_variable wakeup;
for (size_t i = 0; i < myArgs.nrWorkers; i++) for (size_t i = 0; i < myArgs.nrWorkers; i++)
threads.emplace_back(std::thread(handler)); threads.emplace_back(std::thread(collector(state_, wakeup)));
for (auto & thread : threads) for (auto & thread : threads)
thread.join(); thread.join();