Move collecting handler to separate function.
s/master/collector/g
This commit is contained in:
parent
a27faabd0a
commit
b9a87464a0
|
@ -182,7 +182,7 @@ static void worker(
|
|||
auto vRoot = topLevelValue(state, autoArgs);
|
||||
|
||||
while (true) {
|
||||
/* Wait for the master to send us a job name. */
|
||||
/* Wait for the collector to send us a job name. */
|
||||
writeLine(to.get(), "next");
|
||||
|
||||
auto s = readLine(from.get());
|
||||
|
@ -192,7 +192,7 @@ static void worker(
|
|||
|
||||
debug("worker process %d at '%s'", getpid(), attrPath);
|
||||
|
||||
/* Evaluate it and send info back to the master. */
|
||||
/* Evaluate it and send info back to the collector. */
|
||||
nlohmann::json reply;
|
||||
reply["attr"] = attrPath;
|
||||
|
||||
|
@ -287,7 +287,7 @@ static void worker(
|
|||
|
||||
writeLine(to.get(), reply.dump());
|
||||
|
||||
/* If our RSS exceeds the maximum, exit. The master will
|
||||
/* If our RSS exceeds the maximum, exit. The collector will
|
||||
start a new process. */
|
||||
struct rusage r;
|
||||
getrusage(RUSAGE_SELF, &r);
|
||||
|
@ -341,6 +341,91 @@ struct Proc {
|
|||
~Proc() { }
|
||||
};
|
||||
|
||||
struct State
|
||||
{
|
||||
std::set<std::string> todo{""};
|
||||
std::set<std::string> active;
|
||||
std::exception_ptr exc;
|
||||
};
|
||||
|
||||
std::function<void()> collector(Sync<State> & state_, std::condition_variable & wakeup) {
|
||||
return [&]() {
|
||||
try {
|
||||
std::optional<std::unique_ptr<Proc>> proc_;
|
||||
|
||||
while (true) {
|
||||
|
||||
auto proc = proc_.has_value()
|
||||
? std::move(proc_.value())
|
||||
: std::make_unique<Proc>(worker);
|
||||
|
||||
/* Check whether the existing worker process is still there. */
|
||||
auto s = readLine(proc->from.get());
|
||||
if (s == "restart") {
|
||||
proc_ = std::nullopt;
|
||||
continue;
|
||||
} else if (s != "next") {
|
||||
auto json = nlohmann::json::parse(s);
|
||||
throw Error("worker error: %s", (std::string) json["error"]);
|
||||
}
|
||||
|
||||
/* Wait for a job name to become available. */
|
||||
std::string attrPath;
|
||||
|
||||
while (true) {
|
||||
checkInterrupt();
|
||||
auto state(state_.lock());
|
||||
if ((state->todo.empty() && state->active.empty()) || state->exc) {
|
||||
writeLine(proc->to.get(), "exit");
|
||||
return;
|
||||
}
|
||||
if (!state->todo.empty()) {
|
||||
attrPath = *state->todo.begin();
|
||||
state->todo.erase(state->todo.begin());
|
||||
state->active.insert(attrPath);
|
||||
break;
|
||||
} else
|
||||
state.wait(wakeup);
|
||||
}
|
||||
|
||||
/* Tell the worker to evaluate it. */
|
||||
writeLine(proc->to.get(), "do " + attrPath);
|
||||
|
||||
/* Wait for the response. */
|
||||
auto respString = readLine(proc->from.get());
|
||||
auto response = nlohmann::json::parse(respString);
|
||||
|
||||
/* Handle the response. */
|
||||
StringSet newAttrs;
|
||||
if (response.find("attrs") != response.end()) {
|
||||
for (auto & i : response["attrs"]) {
|
||||
auto s = (attrPath.empty() ? "" : attrPath + ".") + (std::string) i;
|
||||
newAttrs.insert(s);
|
||||
}
|
||||
} else {
|
||||
auto state(state_.lock());
|
||||
std::cout << respString << "\n" << std::flush;
|
||||
}
|
||||
|
||||
proc_ = std::move(proc);
|
||||
|
||||
/* Add newly discovered job names to the queue. */
|
||||
{
|
||||
auto state(state_.lock());
|
||||
state->active.erase(attrPath);
|
||||
for (auto & s : newAttrs)
|
||||
state->todo.insert(s);
|
||||
wakeup.notify_all();
|
||||
}
|
||||
}
|
||||
} catch (...) {
|
||||
auto state(state_.lock());
|
||||
state->exc = std::current_exception();
|
||||
wakeup.notify_all();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
int main(int argc, char * * argv)
|
||||
{
|
||||
/* Prevent undeclared dependencies in the evaluation via
|
||||
|
@ -375,98 +460,13 @@ int main(int argc, char * * argv)
|
|||
loggerSettings.showTrace.assign(true);
|
||||
}
|
||||
|
||||
struct State
|
||||
{
|
||||
std::set<std::string> todo{""};
|
||||
std::set<std::string> active;
|
||||
std::exception_ptr exc;
|
||||
};
|
||||
|
||||
std::condition_variable wakeup;
|
||||
|
||||
Sync<State> state_;
|
||||
|
||||
/* Start a handler thread per worker process. */
|
||||
auto handler = [&]()
|
||||
{
|
||||
try {
|
||||
std::optional<std::unique_ptr<Proc>> proc_;
|
||||
|
||||
while (true) {
|
||||
|
||||
auto proc = proc_.has_value()
|
||||
? std::move(proc_.value())
|
||||
: std::make_unique<Proc>(worker);
|
||||
|
||||
/* Check whether the existing worker process is still there. */
|
||||
auto s = readLine(proc->from.get());
|
||||
if (s == "restart") {
|
||||
proc_ = std::nullopt;
|
||||
continue;
|
||||
} else if (s != "next") {
|
||||
auto json = nlohmann::json::parse(s);
|
||||
throw Error("worker error: %s", (std::string) json["error"]);
|
||||
}
|
||||
|
||||
/* Wait for a job name to become available. */
|
||||
std::string attrPath;
|
||||
|
||||
while (true) {
|
||||
checkInterrupt();
|
||||
auto state(state_.lock());
|
||||
if ((state->todo.empty() && state->active.empty()) || state->exc) {
|
||||
writeLine(proc->to.get(), "exit");
|
||||
return;
|
||||
}
|
||||
if (!state->todo.empty()) {
|
||||
attrPath = *state->todo.begin();
|
||||
state->todo.erase(state->todo.begin());
|
||||
state->active.insert(attrPath);
|
||||
break;
|
||||
} else
|
||||
state.wait(wakeup);
|
||||
}
|
||||
|
||||
/* Tell the worker to evaluate it. */
|
||||
writeLine(proc->to.get(), "do " + attrPath);
|
||||
|
||||
/* Wait for the response. */
|
||||
auto respString = readLine(proc->from.get());
|
||||
auto response = nlohmann::json::parse(respString);
|
||||
|
||||
/* Handle the response. */
|
||||
StringSet newAttrs;
|
||||
if (response.find("attrs") != response.end()) {
|
||||
for (auto & i : response["attrs"]) {
|
||||
auto s = (attrPath.empty() ? "" : attrPath + ".") + (std::string) i;
|
||||
newAttrs.insert(s);
|
||||
}
|
||||
} else {
|
||||
auto state(state_.lock());
|
||||
std::cout << respString << "\n" << std::flush;
|
||||
}
|
||||
|
||||
proc_ = std::move(proc);
|
||||
|
||||
/* Add newly discovered job names to the queue. */
|
||||
{
|
||||
auto state(state_.lock());
|
||||
state->active.erase(attrPath);
|
||||
for (auto & s : newAttrs)
|
||||
state->todo.insert(s);
|
||||
wakeup.notify_all();
|
||||
}
|
||||
}
|
||||
} catch (...) {
|
||||
auto state(state_.lock());
|
||||
state->exc = std::current_exception();
|
||||
wakeup.notify_all();
|
||||
}
|
||||
};
|
||||
|
||||
/* Start a collector thread per worker process. */
|
||||
std::vector<std::thread> threads;
|
||||
std::condition_variable wakeup;
|
||||
for (size_t i = 0; i < myArgs.nrWorkers; i++)
|
||||
threads.emplace_back(std::thread(handler));
|
||||
threads.emplace_back(std::thread(collector(state_, wakeup)));
|
||||
|
||||
for (auto & thread : threads)
|
||||
thread.join();
|
||||
|
|
Loading…
Reference in a new issue