diff --git a/src/nix-eval-jobs.cc b/src/nix-eval-jobs.cc index 628a245..7c0ff50 100644 --- a/src/nix-eval-jobs.cc +++ b/src/nix-eval-jobs.cc @@ -177,13 +177,12 @@ static void worker( EvalState & state, Bindings & autoArgs, AutoCloseFD & to, - AutoCloseFD & from, - const Path &gcRootsDir) + AutoCloseFD & from) { auto vRoot = topLevelValue(state, autoArgs); while (true) { - /* Wait for the master to send us a job name. */ + /* Wait for the collector to send us a job name. */ writeLine(to.get(), "next"); auto s = readLine(from.get()); @@ -193,7 +192,7 @@ static void worker( debug("worker process %d at '%s'", getpid(), attrPath); - /* Evaluate it and send info back to the master. */ + /* Evaluate it and send info back to the collector. */ nlohmann::json reply; reply["attr"] = attrPath; @@ -245,8 +244,8 @@ static void worker( /* Register the derivation as a GC root. !!! This registers roots for jobs that we may have already done. */ - if (gcRootsDir != "") { - Path root = gcRootsDir + "/" + std::string(baseNameOf(drvPath)); + if (myArgs.gcRootsDir != "") { + Path root = myArgs.gcRootsDir + "/" + std::string(baseNameOf(drvPath)); if (!pathExists(root)) localStore->addPermRoot(storePath, root); } @@ -290,7 +289,7 @@ static void worker( writeLine(to.get(), reply.dump()); - /* If our RSS exceeds the maximum, exit. The master will + /* If our RSS exceeds the maximum, exit. The collector will start a new process. */ struct rusage r; getrusage(RUSAGE_SELF, &r); @@ -300,6 +299,135 @@ static void worker( writeLine(to.get(), "restart"); } +typedef std::function + Processor; + +/* Auto-cleanup of fork's process and fds. */ +struct Proc { + AutoCloseFD to, from; + Pid pid; + + Proc(const Processor & proc) { + Pipe toPipe, fromPipe; + toPipe.create(); + fromPipe.create(); + auto p = startProcess( + [&, + to{std::make_shared(std::move(fromPipe.writeSide))}, + from{std::make_shared(std::move(toPipe.readSide))} + ]() + { + debug("created worker process %d", getpid()); + try { + EvalState state(myArgs.searchPath, openStore()); + Bindings & autoArgs = *myArgs.getAutoArgs(state); + proc(state, autoArgs, *to, *from); + } catch (Error & e) { + nlohmann::json err; + auto msg = e.msg(); + err["error"] = filterANSIEscapes(msg, true); + printError(msg); + writeLine(to->get(), err.dump()); + // Don't forget to print it into the STDERR log, this is + // what's shown in the Hydra UI. + writeLine(to->get(), "restart"); + } + }, + ProcessOptions { .allowVfork = false }); + + to = std::move(toPipe.writeSide); + from = std::move(fromPipe.readSide); + pid = p; + } + + ~Proc() { } +}; + +struct State +{ + std::set todo{""}; + std::set active; + std::exception_ptr exc; +}; + +std::function collector(Sync & state_, std::condition_variable & wakeup) { + return [&]() { + try { + std::optional> proc_; + + while (true) { + + auto proc = proc_.has_value() + ? std::move(proc_.value()) + : std::make_unique(worker); + + /* Check whether the existing worker process is still there. */ + auto s = readLine(proc->from.get()); + if (s == "restart") { + proc_ = std::nullopt; + continue; + } else if (s != "next") { + auto json = nlohmann::json::parse(s); + throw Error("worker error: %s", (std::string) json["error"]); + } + + /* Wait for a job name to become available. */ + std::string attrPath; + + while (true) { + checkInterrupt(); + auto state(state_.lock()); + if ((state->todo.empty() && state->active.empty()) || state->exc) { + writeLine(proc->to.get(), "exit"); + return; + } + if (!state->todo.empty()) { + attrPath = *state->todo.begin(); + state->todo.erase(state->todo.begin()); + state->active.insert(attrPath); + break; + } else + state.wait(wakeup); + } + + /* Tell the worker to evaluate it. */ + writeLine(proc->to.get(), "do " + attrPath); + + /* Wait for the response. */ + auto respString = readLine(proc->from.get()); + auto response = nlohmann::json::parse(respString); + + /* Handle the response. */ + StringSet newAttrs; + if (response.find("attrs") != response.end()) { + for (auto & i : response["attrs"]) { + auto s = (attrPath.empty() ? "" : attrPath + ".") + (std::string) i; + newAttrs.insert(s); + } + } else { + auto state(state_.lock()); + std::cout << respString << "\n" << std::flush; + } + + proc_ = std::move(proc); + + /* Add newly discovered job names to the queue. */ + { + auto state(state_.lock()); + state->active.erase(attrPath); + for (auto & s : newAttrs) + state->todo.insert(s); + wakeup.notify_all(); + } + } + } catch (...) { + auto state(state_.lock()); + state->exc = std::current_exception(); + wakeup.notify_all(); + } + }; +} + int main(int argc, char * * argv) { /* Prevent undeclared dependencies in the evaluation via @@ -334,125 +462,13 @@ int main(int argc, char * * argv) loggerSettings.showTrace.assign(true); } - struct State - { - std::set todo{""}; - std::set active; - std::exception_ptr exc; - }; - - std::condition_variable wakeup; - Sync state_; - /* Start a handler thread per worker process. */ - auto handler = [&]() - { - try { - pid_t pid = -1; - AutoCloseFD from, to; - - while (true) { - - /* Start a new worker process if necessary. */ - if (pid == -1) { - Pipe toPipe, fromPipe; - toPipe.create(); - fromPipe.create(); - pid = startProcess( - [&, - to{std::make_shared(std::move(fromPipe.writeSide))}, - from{std::make_shared(std::move(toPipe.readSide))} - ]() - { - try { - EvalState state(myArgs.searchPath, openStore()); - Bindings & autoArgs = *myArgs.getAutoArgs(state); - worker(state, autoArgs, *to, *from, myArgs.gcRootsDir); - } catch (Error & e) { - nlohmann::json err; - auto msg = e.msg(); - err["error"] = filterANSIEscapes(msg, true); - printError(msg); - writeLine(to->get(), err.dump()); - // Don't forget to print it into the STDERR log, this is - // what's shown in the Hydra UI. - writeLine(to->get(), "restart"); - } - }, - ProcessOptions { .allowVfork = false }); - from = std::move(fromPipe.readSide); - to = std::move(toPipe.writeSide); - debug("created worker process %d", pid); - } - - /* Check whether the existing worker process is still there. */ - auto s = readLine(from.get()); - if (s == "restart") { - pid = -1; - continue; - } else if (s != "next") { - auto json = nlohmann::json::parse(s); - throw Error("worker error: %s", (std::string) json["error"]); - } - - /* Wait for a job name to become available. */ - std::string attrPath; - - while (true) { - checkInterrupt(); - auto state(state_.lock()); - if ((state->todo.empty() && state->active.empty()) || state->exc) { - writeLine(to.get(), "exit"); - return; - } - if (!state->todo.empty()) { - attrPath = *state->todo.begin(); - state->todo.erase(state->todo.begin()); - state->active.insert(attrPath); - break; - } else - state.wait(wakeup); - } - - /* Tell the worker to evaluate it. */ - writeLine(to.get(), "do " + attrPath); - - /* Wait for the response. */ - auto respString = readLine(from.get()); - auto response = nlohmann::json::parse(respString); - - /* Handle the response. */ - StringSet newAttrs; - if (response.find("attrs") != response.end()) { - for (auto & i : response["attrs"]) { - auto s = (attrPath.empty() ? "" : attrPath + ".") + (std::string) i; - newAttrs.insert(s); - } - } else { - auto state(state_.lock()); - std::cout << respString << "\n" << std::flush; - } - - /* Add newly discovered job names to the queue. */ - { - auto state(state_.lock()); - state->active.erase(attrPath); - for (auto & s : newAttrs) - state->todo.insert(s); - wakeup.notify_all(); - } - } - } catch (...) { - auto state(state_.lock()); - state->exc = std::current_exception(); - wakeup.notify_all(); - } - }; - + /* Start a collector thread per worker process. */ std::vector threads; + std::condition_variable wakeup; for (size_t i = 0; i < myArgs.nrWorkers; i++) - threads.emplace_back(std::thread(handler)); + threads.emplace_back(std::thread(collector(state_, wakeup))); for (auto & thread : threads) thread.join();