From b9a87464a06c60598d0948e974ee6538ab024f60 Mon Sep 17 00:00:00 2001 From: John Soo Date: Wed, 20 Apr 2022 21:01:02 -0700 Subject: [PATCH] Move collecting handler to separate function. s/master/collector/g --- src/nix-eval-jobs.cc | 182 +++++++++++++++++++++---------------------- 1 file changed, 91 insertions(+), 91 deletions(-) diff --git a/src/nix-eval-jobs.cc b/src/nix-eval-jobs.cc index d71cf8d..0e5d78a 100644 --- a/src/nix-eval-jobs.cc +++ b/src/nix-eval-jobs.cc @@ -182,7 +182,7 @@ static void worker( auto vRoot = topLevelValue(state, autoArgs); while (true) { - /* Wait for the master to send us a job name. */ + /* Wait for the collector to send us a job name. */ writeLine(to.get(), "next"); auto s = readLine(from.get()); @@ -192,7 +192,7 @@ static void worker( debug("worker process %d at '%s'", getpid(), attrPath); - /* Evaluate it and send info back to the master. */ + /* Evaluate it and send info back to the collector. */ nlohmann::json reply; reply["attr"] = attrPath; @@ -287,7 +287,7 @@ static void worker( writeLine(to.get(), reply.dump()); - /* If our RSS exceeds the maximum, exit. The master will + /* If our RSS exceeds the maximum, exit. The collector will start a new process. */ struct rusage r; getrusage(RUSAGE_SELF, &r); @@ -341,6 +341,91 @@ struct Proc { ~Proc() { } }; +struct State +{ + std::set todo{""}; + std::set active; + std::exception_ptr exc; +}; + +std::function collector(Sync & state_, std::condition_variable & wakeup) { + return [&]() { + try { + std::optional> proc_; + + while (true) { + + auto proc = proc_.has_value() + ? std::move(proc_.value()) + : std::make_unique(worker); + + /* Check whether the existing worker process is still there. */ + auto s = readLine(proc->from.get()); + if (s == "restart") { + proc_ = std::nullopt; + continue; + } else if (s != "next") { + auto json = nlohmann::json::parse(s); + throw Error("worker error: %s", (std::string) json["error"]); + } + + /* Wait for a job name to become available. */ + std::string attrPath; + + while (true) { + checkInterrupt(); + auto state(state_.lock()); + if ((state->todo.empty() && state->active.empty()) || state->exc) { + writeLine(proc->to.get(), "exit"); + return; + } + if (!state->todo.empty()) { + attrPath = *state->todo.begin(); + state->todo.erase(state->todo.begin()); + state->active.insert(attrPath); + break; + } else + state.wait(wakeup); + } + + /* Tell the worker to evaluate it. */ + writeLine(proc->to.get(), "do " + attrPath); + + /* Wait for the response. */ + auto respString = readLine(proc->from.get()); + auto response = nlohmann::json::parse(respString); + + /* Handle the response. */ + StringSet newAttrs; + if (response.find("attrs") != response.end()) { + for (auto & i : response["attrs"]) { + auto s = (attrPath.empty() ? "" : attrPath + ".") + (std::string) i; + newAttrs.insert(s); + } + } else { + auto state(state_.lock()); + std::cout << respString << "\n" << std::flush; + } + + proc_ = std::move(proc); + + /* Add newly discovered job names to the queue. */ + { + auto state(state_.lock()); + state->active.erase(attrPath); + for (auto & s : newAttrs) + state->todo.insert(s); + wakeup.notify_all(); + } + } + } catch (...) { + auto state(state_.lock()); + state->exc = std::current_exception(); + wakeup.notify_all(); + } + }; +} + int main(int argc, char * * argv) { /* Prevent undeclared dependencies in the evaluation via @@ -375,98 +460,13 @@ int main(int argc, char * * argv) loggerSettings.showTrace.assign(true); } - struct State - { - std::set todo{""}; - std::set active; - std::exception_ptr exc; - }; - - std::condition_variable wakeup; - Sync state_; - /* Start a handler thread per worker process. */ - auto handler = [&]() - { - try { - std::optional> proc_; - - while (true) { - - auto proc = proc_.has_value() - ? std::move(proc_.value()) - : std::make_unique(worker); - - /* Check whether the existing worker process is still there. */ - auto s = readLine(proc->from.get()); - if (s == "restart") { - proc_ = std::nullopt; - continue; - } else if (s != "next") { - auto json = nlohmann::json::parse(s); - throw Error("worker error: %s", (std::string) json["error"]); - } - - /* Wait for a job name to become available. */ - std::string attrPath; - - while (true) { - checkInterrupt(); - auto state(state_.lock()); - if ((state->todo.empty() && state->active.empty()) || state->exc) { - writeLine(proc->to.get(), "exit"); - return; - } - if (!state->todo.empty()) { - attrPath = *state->todo.begin(); - state->todo.erase(state->todo.begin()); - state->active.insert(attrPath); - break; - } else - state.wait(wakeup); - } - - /* Tell the worker to evaluate it. */ - writeLine(proc->to.get(), "do " + attrPath); - - /* Wait for the response. */ - auto respString = readLine(proc->from.get()); - auto response = nlohmann::json::parse(respString); - - /* Handle the response. */ - StringSet newAttrs; - if (response.find("attrs") != response.end()) { - for (auto & i : response["attrs"]) { - auto s = (attrPath.empty() ? "" : attrPath + ".") + (std::string) i; - newAttrs.insert(s); - } - } else { - auto state(state_.lock()); - std::cout << respString << "\n" << std::flush; - } - - proc_ = std::move(proc); - - /* Add newly discovered job names to the queue. */ - { - auto state(state_.lock()); - state->active.erase(attrPath); - for (auto & s : newAttrs) - state->todo.insert(s); - wakeup.notify_all(); - } - } - } catch (...) { - auto state(state_.lock()); - state->exc = std::current_exception(); - wakeup.notify_all(); - } - }; - + /* Start a collector thread per worker process. */ std::vector threads; + std::condition_variable wakeup; for (size_t i = 0; i < myArgs.nrWorkers; i++) - threads.emplace_back(std::thread(handler)); + threads.emplace_back(std::thread(collector(state_, wakeup))); for (auto & thread : threads) thread.join();