simplify collector function

looks like the lambda doesn't buy us anything here.
This commit is contained in:
Jörg Thalheim 2023-12-10 17:19:47 +01:00 committed by mergify[bot]
parent 97dd8023e8
commit b24c03e2de

View file

@ -119,117 +119,113 @@ void handleBrokenWorkerPipe(Proc &proc) {
} }
} }
std::function<void()> collector(Sync<State> &state_, void collector(Sync<State> &state_, std::condition_variable &wakeup) {
std::condition_variable &wakeup) { try {
return [&]() { std::optional<std::unique_ptr<Proc>> proc_;
try { std::optional<std::unique_ptr<LineReader>> fromReader_;
std::optional<std::unique_ptr<Proc>> proc_;
std::optional<std::unique_ptr<LineReader>> fromReader_;
while (true) { while (true) {
if (!proc_.has_value()) { if (!proc_.has_value()) {
proc_ = std::make_unique<Proc>(worker); proc_ = std::make_unique<Proc>(worker);
fromReader_ = std::make_unique<LineReader>( fromReader_ =
proc_.value()->from.release()); std::make_unique<LineReader>(proc_.value()->from.release());
} }
auto proc = std::move(proc_.value()); auto proc = std::move(proc_.value());
auto fromReader = std::move(fromReader_.value()); auto fromReader = std::move(fromReader_.value());
/* Check whether the existing worker process is still there. */ /* Check whether the existing worker process is still there. */
auto s = fromReader->readLine(); auto s = fromReader->readLine();
if (s == "") { if (s == "") {
handleBrokenWorkerPipe(*proc.get()); handleBrokenWorkerPipe(*proc.get());
} else if (s == "restart") { } else if (s == "restart") {
proc_ = std::nullopt; proc_ = std::nullopt;
fromReader_ = std::nullopt; fromReader_ = std::nullopt;
continue; continue;
} else if (s != "next") { } else if (s != "next") {
try {
auto json = json::parse(s);
throw Error("worker error: %s",
(std::string)json["error"]);
} catch (const json::exception &e) {
throw Error(
"Received invalid JSON from worker: %s '%s'",
e.what(), s);
}
}
/* Wait for a job name to become available. */
json attrPath;
while (true) {
checkInterrupt();
auto state(state_.lock());
if ((state->todo.empty() && state->active.empty()) ||
state->exc) {
if (tryWriteLine(proc->to.get(), "exit") < 0) {
handleBrokenWorkerPipe(*proc.get());
}
return;
}
if (!state->todo.empty()) {
attrPath = *state->todo.begin();
state->todo.erase(state->todo.begin());
state->active.insert(attrPath);
break;
} else
state.wait(wakeup);
}
/* Tell the worker to evaluate it. */
if (tryWriteLine(proc->to.get(), "do " + attrPath.dump()) < 0) {
handleBrokenWorkerPipe(*proc.get());
}
/* Wait for the response. */
auto respString = fromReader->readLine();
if (respString == "") {
handleBrokenWorkerPipe(*proc.get());
}
json response;
try { try {
response = json::parse(respString); auto json = json::parse(s);
throw Error("worker error: %s", (std::string)json["error"]);
} catch (const json::exception &e) { } catch (const json::exception &e) {
throw Error("Received invalid JSON from worker: %s '%s'", throw Error("Received invalid JSON from worker: %s '%s'",
e.what(), respString); e.what(), s);
}
/* Handle the response. */
std::vector<json> newAttrs;
if (response.find("attrs") != response.end()) {
for (auto &i : response["attrs"]) {
json newAttr = json(response["attrPath"]);
newAttr.emplace_back(i);
newAttrs.push_back(newAttr);
}
} else {
auto state(state_.lock());
std::cout << respString << "\n" << std::flush;
}
proc_ = std::move(proc);
fromReader_ = std::move(fromReader);
/* Add newly discovered job names to the queue. */
{
auto state(state_.lock());
state->active.erase(attrPath);
for (auto p : newAttrs) {
state->todo.insert(p);
}
wakeup.notify_all();
} }
} }
} catch (...) {
auto state(state_.lock()); /* Wait for a job name to become available. */
state->exc = std::current_exception(); json attrPath;
wakeup.notify_all();
while (true) {
checkInterrupt();
auto state(state_.lock());
if ((state->todo.empty() && state->active.empty()) ||
state->exc) {
if (tryWriteLine(proc->to.get(), "exit") < 0) {
handleBrokenWorkerPipe(*proc.get());
}
return;
}
if (!state->todo.empty()) {
attrPath = *state->todo.begin();
state->todo.erase(state->todo.begin());
state->active.insert(attrPath);
break;
} else
state.wait(wakeup);
}
/* Tell the worker to evaluate it. */
if (tryWriteLine(proc->to.get(), "do " + attrPath.dump()) < 0) {
handleBrokenWorkerPipe(*proc.get());
}
/* Wait for the response. */
auto respString = fromReader->readLine();
if (respString == "") {
handleBrokenWorkerPipe(*proc.get());
}
json response;
try {
response = json::parse(respString);
} catch (const json::exception &e) {
throw Error("Received invalid JSON from worker: %s '%s'",
e.what(), respString);
}
/* Handle the response. */
std::vector<json> newAttrs;
if (response.find("attrs") != response.end()) {
for (auto &i : response["attrs"]) {
json newAttr = json(response["attrPath"]);
newAttr.emplace_back(i);
newAttrs.push_back(newAttr);
}
} else {
auto state(state_.lock());
std::cout << respString << "\n" << std::flush;
}
proc_ = std::move(proc);
fromReader_ = std::move(fromReader);
/* Add newly discovered job names to the queue. */
{
auto state(state_.lock());
state->active.erase(attrPath);
for (auto p : newAttrs) {
state->todo.insert(p);
}
wakeup.notify_all();
}
} }
}; } catch (...) {
auto state(state_.lock());
state->exc = std::current_exception();
wakeup.notify_all();
}
} }
int main(int argc, char **argv) { int main(int argc, char **argv) {
/* Prevent undeclared dependencies in the evaluation via /* Prevent undeclared dependencies in the evaluation via
$NIX_PATH. */ $NIX_PATH. */
unsetenv("NIX_PATH"); unsetenv("NIX_PATH");
@ -277,8 +273,9 @@ int main(int argc, char **argv) {
/* Start a collector thread per worker process. */ /* Start a collector thread per worker process. */
std::vector<std::thread> threads; std::vector<std::thread> threads;
std::condition_variable wakeup; std::condition_variable wakeup;
for (size_t i = 0; i < myArgs.nrWorkers; i++) for (size_t i = 0; i < myArgs.nrWorkers; i++) {
threads.emplace_back(std::thread(collector(state_, wakeup))); threads.emplace_back(collector, std::ref(state_), std::ref(wakeup));
}
for (auto &thread : threads) for (auto &thread : threads)
thread.join(); thread.join();