From b24c03e2dec7420576b1f27d1d843a46ec75136d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Thalheim?= Date: Sun, 10 Dec 2023 17:19:47 +0100 Subject: [PATCH] simplify collector function looks like the lambda doesn't buy us anything here. --- src/nix-eval-jobs.cc | 201 +++++++++++++++++++++---------------------- 1 file changed, 99 insertions(+), 102 deletions(-) diff --git a/src/nix-eval-jobs.cc b/src/nix-eval-jobs.cc index 87eb456..0429529 100644 --- a/src/nix-eval-jobs.cc +++ b/src/nix-eval-jobs.cc @@ -119,117 +119,113 @@ void handleBrokenWorkerPipe(Proc &proc) { } } -std::function collector(Sync &state_, - std::condition_variable &wakeup) { - return [&]() { - try { - std::optional> proc_; - std::optional> fromReader_; +void collector(Sync &state_, std::condition_variable &wakeup) { + try { + std::optional> proc_; + std::optional> fromReader_; - while (true) { - if (!proc_.has_value()) { - proc_ = std::make_unique(worker); - fromReader_ = std::make_unique( - proc_.value()->from.release()); - } - auto proc = std::move(proc_.value()); - auto fromReader = std::move(fromReader_.value()); + while (true) { + if (!proc_.has_value()) { + proc_ = std::make_unique(worker); + fromReader_ = + std::make_unique(proc_.value()->from.release()); + } + auto proc = std::move(proc_.value()); + auto fromReader = std::move(fromReader_.value()); - /* Check whether the existing worker process is still there. */ - auto s = fromReader->readLine(); - if (s == "") { - handleBrokenWorkerPipe(*proc.get()); - } else if (s == "restart") { - proc_ = std::nullopt; - fromReader_ = std::nullopt; - continue; - } else if (s != "next") { - try { - auto json = json::parse(s); - throw Error("worker error: %s", - (std::string)json["error"]); - } catch (const json::exception &e) { - throw Error( - "Received invalid JSON from worker: %s '%s'", - e.what(), s); - } - } - - /* Wait for a job name to become available. */ - json attrPath; - - while (true) { - checkInterrupt(); - auto state(state_.lock()); - if ((state->todo.empty() && state->active.empty()) || - state->exc) { - if (tryWriteLine(proc->to.get(), "exit") < 0) { - handleBrokenWorkerPipe(*proc.get()); - } - return; - } - if (!state->todo.empty()) { - attrPath = *state->todo.begin(); - state->todo.erase(state->todo.begin()); - state->active.insert(attrPath); - break; - } else - state.wait(wakeup); - } - - /* Tell the worker to evaluate it. */ - if (tryWriteLine(proc->to.get(), "do " + attrPath.dump()) < 0) { - handleBrokenWorkerPipe(*proc.get()); - } - - /* Wait for the response. */ - auto respString = fromReader->readLine(); - if (respString == "") { - handleBrokenWorkerPipe(*proc.get()); - } - json response; + /* Check whether the existing worker process is still there. */ + auto s = fromReader->readLine(); + if (s == "") { + handleBrokenWorkerPipe(*proc.get()); + } else if (s == "restart") { + proc_ = std::nullopt; + fromReader_ = std::nullopt; + continue; + } else if (s != "next") { try { - response = json::parse(respString); + auto json = json::parse(s); + throw Error("worker error: %s", (std::string)json["error"]); } catch (const json::exception &e) { throw Error("Received invalid JSON from worker: %s '%s'", - e.what(), respString); - } - - /* Handle the response. */ - std::vector newAttrs; - if (response.find("attrs") != response.end()) { - for (auto &i : response["attrs"]) { - json newAttr = json(response["attrPath"]); - newAttr.emplace_back(i); - newAttrs.push_back(newAttr); - } - } else { - auto state(state_.lock()); - std::cout << respString << "\n" << std::flush; - } - - proc_ = std::move(proc); - fromReader_ = std::move(fromReader); - - /* Add newly discovered job names to the queue. */ - { - auto state(state_.lock()); - state->active.erase(attrPath); - for (auto p : newAttrs) { - state->todo.insert(p); - } - wakeup.notify_all(); + e.what(), s); } } - } catch (...) { - auto state(state_.lock()); - state->exc = std::current_exception(); - wakeup.notify_all(); + + /* Wait for a job name to become available. */ + json attrPath; + + while (true) { + checkInterrupt(); + auto state(state_.lock()); + if ((state->todo.empty() && state->active.empty()) || + state->exc) { + if (tryWriteLine(proc->to.get(), "exit") < 0) { + handleBrokenWorkerPipe(*proc.get()); + } + return; + } + if (!state->todo.empty()) { + attrPath = *state->todo.begin(); + state->todo.erase(state->todo.begin()); + state->active.insert(attrPath); + break; + } else + state.wait(wakeup); + } + + /* Tell the worker to evaluate it. */ + if (tryWriteLine(proc->to.get(), "do " + attrPath.dump()) < 0) { + handleBrokenWorkerPipe(*proc.get()); + } + + /* Wait for the response. */ + auto respString = fromReader->readLine(); + if (respString == "") { + handleBrokenWorkerPipe(*proc.get()); + } + json response; + try { + response = json::parse(respString); + } catch (const json::exception &e) { + throw Error("Received invalid JSON from worker: %s '%s'", + e.what(), respString); + } + + /* Handle the response. */ + std::vector newAttrs; + if (response.find("attrs") != response.end()) { + for (auto &i : response["attrs"]) { + json newAttr = json(response["attrPath"]); + newAttr.emplace_back(i); + newAttrs.push_back(newAttr); + } + } else { + auto state(state_.lock()); + std::cout << respString << "\n" << std::flush; + } + + proc_ = std::move(proc); + fromReader_ = std::move(fromReader); + + /* Add newly discovered job names to the queue. */ + { + auto state(state_.lock()); + state->active.erase(attrPath); + for (auto p : newAttrs) { + state->todo.insert(p); + } + wakeup.notify_all(); + } } - }; + } catch (...) { + auto state(state_.lock()); + state->exc = std::current_exception(); + wakeup.notify_all(); + } } int main(int argc, char **argv) { + /* Prevent undeclared dependencies in the evaluation via $NIX_PATH. */ unsetenv("NIX_PATH"); @@ -277,8 +273,9 @@ int main(int argc, char **argv) { /* Start a collector thread per worker process. */ std::vector threads; std::condition_variable wakeup; - for (size_t i = 0; i < myArgs.nrWorkers; i++) - threads.emplace_back(std::thread(collector(state_, wakeup))); + for (size_t i = 0; i < myArgs.nrWorkers; i++) { + threads.emplace_back(collector, std::ref(state_), std::ref(wakeup)); + } for (auto &thread : threads) thread.join();