diff --git a/src/libstore/build/derivation-goal.cc b/src/libstore/build/derivation-goal.cc index 827c9f541..f40611b31 100644 --- a/src/libstore/build/derivation-goal.cc +++ b/src/libstore/build/derivation-goal.cc @@ -11,7 +11,10 @@ #include "drv-output-substitution-goal.hh" #include "strings.hh" +#include #include +#include +#include #include #include #include @@ -780,7 +783,7 @@ try { buildResult.startTime = time(0); // inexact state = &DerivationGoal::buildDone; started(); - return WaitForWorld{std::move(a.fds), false}; + return WaitForWorld{std::move(a.promise), false}; }, [&](HookReply::Postpone) -> std::optional { /* Not now; wait until at least one child finishes or @@ -992,9 +995,6 @@ try { buildResult.timesBuilt++; buildResult.stopTime = time(0); - /* So the child is gone now. */ - worker.childTerminated(this); - /* Close the read side of the logger pipe. */ closeReadPipes(); @@ -1266,12 +1266,8 @@ HookReply DerivationGoal::tryBuildHook(bool inBuildSlot) /* Create the log file and pipe. */ Path logFile = openLogFile(); - std::set fds; - fds.insert(hook->fromHook.get()); - fds.insert(hook->builderOut.get()); builderOutFD = &hook->builderOut; - - return HookReply::Accept{std::move(fds)}; + return HookReply::Accept{handleChildOutput()}; } @@ -1331,23 +1327,69 @@ void DerivationGoal::closeLogFile() } -Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data) +Goal::Finished DerivationGoal::tooMuchLogs() { - assert(builderOutFD); + killChild(); + return done( + BuildResult::LogLimitExceeded, {}, + Error("%s killed after writing more than %d bytes of log output", + getName(), settings.maxLogSize)); +} - auto tooMuchLogs = [&] { - killChild(); - return done( - BuildResult::LogLimitExceeded, {}, - Error("%s killed after writing more than %d bytes of log output", - getName(), settings.maxLogSize)); - }; +struct DerivationGoal::InputStream final : private kj::AsyncObject +{ + int fd; + kj::UnixEventPort::FdObserver observer; + + InputStream(kj::UnixEventPort & ep, int fd) + : fd(fd) + , observer(ep, fd, kj::UnixEventPort::FdObserver::OBSERVE_READ) + { + int flags = fcntl(fd, F_GETFL); + if (flags < 0) { + throw SysError("fcntl(F_GETFL) failed on fd %i", fd); + } + if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) { + throw SysError("fcntl(F_SETFL) failed on fd %i", fd); + } + } + + kj::Promise read(kj::ArrayPtr buffer) + { + const auto res = ::read(fd, buffer.begin(), buffer.size()); + // closing a pty endpoint causes EIO on the other endpoint. stock kj streams + // do not handle this and throw exceptions we can't ask for errno instead :( + // (we can't use `errno` either because kj may well have mangled it by now.) + if (res == 0 || (res == -1 && errno == EIO)) { + return std::string_view{}; + } + + KJ_NONBLOCKING_SYSCALL(res) {} + + if (res > 0) { + return std::string_view{buffer.begin(), static_cast(res)}; + } + + return observer.whenBecomesReadable().then([this, buffer] { + return read(buffer); + }); + } +}; + +kj::Promise> DerivationGoal::handleBuilderOutput(InputStream & in) noexcept +try { + auto buf = kj::heapArray(4096); + while (true) { + auto data = co_await in.read(buf); + lastChildActivity = worker.aio.provider->getTimer().now(); + + if (data.empty()) { + co_return result::success(); + } - // local & `ssh://`-builds are dealt with here. - if (fd == builderOutFD->get()) { logSize += data.size(); if (settings.maxLogSize && logSize > settings.maxLogSize) { - return tooMuchLogs(); + co_return tooMuchLogs(); } for (auto c : data) @@ -1362,10 +1404,22 @@ Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data } if (logSink) (*logSink)(data); - return StillAlive{}; } +} catch (...) { + co_return std::current_exception(); +} + +kj::Promise> DerivationGoal::handleHookOutput(InputStream & in) noexcept +try { + auto buf = kj::heapArray(4096); + while (true) { + auto data = co_await in.read(buf); + lastChildActivity = worker.aio.provider->getTimer().now(); + + if (data.empty()) { + co_return result::success(); + } - if (hook && fd == hook->fromHook.get()) { for (auto c : data) if (c == '\n') { auto json = parseJSONMessage(currentHookLine); @@ -1381,7 +1435,7 @@ Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data (fields.size() > 0 ? fields[0].get() : "") + "\n"; logSize += logLine.size(); if (settings.maxLogSize && logSize > settings.maxLogSize) { - return tooMuchLogs(); + co_return tooMuchLogs(); } (*logSink)(logLine); } else if (type == resSetPhase && ! fields.is_null()) { @@ -1405,16 +1459,83 @@ Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data } else currentHookLine += c; } - - return StillAlive{}; +} catch (...) { + co_return std::current_exception(); } +kj::Promise> DerivationGoal::handleChildOutput() noexcept +try { + assert(builderOutFD); -void DerivationGoal::handleEOF(int fd) + auto builderIn = kj::heap(worker.aio.unixEventPort, builderOutFD->get()); + kj::Own hookIn; + if (hook) { + hookIn = kj::heap(worker.aio.unixEventPort, hook->fromHook.get()); + } + + auto handlers = handleChildStreams(*builderIn, hookIn.get()).attach(std::move(builderIn), std::move(hookIn)); + + if (respectsTimeouts() && settings.buildTimeout != 0) { + handlers = handlers.exclusiveJoin( + worker.aio.provider->getTimer() + .afterDelay(settings.buildTimeout.get() * kj::SECONDS) + .then([this]() -> Outcome { + return timedOut( + Error("%1% timed out after %2% seconds", name, settings.buildTimeout) + ); + }) + ); + } + + return handlers.then([this](auto r) -> Outcome { + if (!currentLogLine.empty()) flushLine(); + return r; + }); +} catch (...) { + return {std::current_exception()}; +} + +kj::Promise> DerivationGoal::monitorForSilence() noexcept { - if (!currentLogLine.empty()) flushLine(); + while (true) { + const auto stash = lastChildActivity; + auto waitUntil = lastChildActivity + settings.maxSilentTime.get() * kj::SECONDS; + co_await worker.aio.provider->getTimer().atTime(waitUntil); + if (lastChildActivity == stash) { + co_return timedOut( + Error("%1% timed out after %2% seconds of silence", name, settings.maxSilentTime) + ); + } + } } +kj::Promise> +DerivationGoal::handleChildStreams(InputStream & builderIn, InputStream * hookIn) noexcept +{ + lastChildActivity = worker.aio.provider->getTimer().now(); + + auto handlers = kj::joinPromisesFailFast([&] { + kj::Vector>> parts{2}; + + parts.add(handleBuilderOutput(builderIn)); + if (hookIn) { + parts.add(handleHookOutput(*hookIn)); + } + + return parts.releaseAsArray(); + }()); + + if (respectsTimeouts() && settings.maxSilentTime != 0) { + handlers = handlers.exclusiveJoin(monitorForSilence().then([](auto r) { + return kj::arr(std::move(r)); + })); + } + + for (auto r : co_await handlers) { + BOOST_OUTCOME_CO_TRYV(r); + } + co_return result::success(); +} void DerivationGoal::flushLine() { diff --git a/src/libstore/build/derivation-goal.hh b/src/libstore/build/derivation-goal.hh index 020388d5a..46b07fc0b 100644 --- a/src/libstore/build/derivation-goal.hh +++ b/src/libstore/build/derivation-goal.hh @@ -8,6 +8,7 @@ #include "store-api.hh" #include "pathlocks.hh" #include "goal.hh" +#include namespace nix { @@ -17,7 +18,7 @@ struct HookInstance; struct HookReplyBase { struct [[nodiscard]] Accept { - std::set fds; + kj::Promise> promise; }; struct [[nodiscard]] Decline {}; struct [[nodiscard]] Postpone {}; @@ -70,6 +71,8 @@ struct InitialOutput { */ struct DerivationGoal : public Goal { + struct InputStream; + /** * Whether to use an on-disk .drv file. */ @@ -242,7 +245,7 @@ struct DerivationGoal : public Goal BuildMode buildMode = bmNormal); virtual ~DerivationGoal() noexcept(false); - Finished timedOut(Error && ex) override; + Finished timedOut(Error && ex); std::string key() override; @@ -312,13 +315,19 @@ struct DerivationGoal : public Goal virtual void cleanupPostOutputsRegisteredModeCheck(); virtual void cleanupPostOutputsRegisteredModeNonCheck(); - /** - * Callback used by the worker to write to the log. - */ - WorkResult handleChildOutput(int fd, std::string_view data) override; - void handleEOF(int fd) override; +protected: + kj::TimePoint lastChildActivity = kj::minValue; + + kj::Promise> handleChildOutput() noexcept; + kj::Promise> + handleChildStreams(InputStream & builderIn, InputStream * hookIn) noexcept; + kj::Promise> handleBuilderOutput(InputStream & in) noexcept; + kj::Promise> handleHookOutput(InputStream & in) noexcept; + kj::Promise> monitorForSilence() noexcept; + Finished tooMuchLogs(); void flushLine(); +public: /** * Wrappers around the corresponding Store methods that first consult the * derivation. This is currently needed because when there is no drv file @@ -357,6 +366,11 @@ struct DerivationGoal : public Goal void waiteeDone(GoalPtr waitee) override; + virtual bool respectsTimeouts() + { + return false; + } + StorePathSet exportReferences(const StorePathSet & storePaths); JobCategory jobCategory() const override { diff --git a/src/libstore/build/drv-output-substitution-goal.cc b/src/libstore/build/drv-output-substitution-goal.cc index 7986123cc..fdee53699 100644 --- a/src/libstore/build/drv-output-substitution-goal.cc +++ b/src/libstore/build/drv-output-substitution-goal.cc @@ -69,24 +69,26 @@ try { some other error occurs), so it must not touch `this`. So put the shared state in a separate refcounted object. */ downloadState = std::make_shared(); - downloadState->outPipe.create(); + auto pipe = kj::newPromiseAndCrossThreadFulfiller(); + downloadState->outPipe = kj::mv(pipe.fulfiller); downloadState->result = std::async(std::launch::async, [downloadState{downloadState}, id{id}, sub{sub}] { + Finally updateStats([&]() { downloadState->outPipe->fulfill(); }); ReceiveInterrupts receiveInterrupts; - Finally updateStats([&]() { downloadState->outPipe.writeSide.close(); }); return sub->queryRealisation(id); }); state = &DrvOutputSubstitutionGoal::realisationFetched; - return {WaitForWorld{{downloadState->outPipe.readSide.get()}, true}}; + return {WaitForWorld{ + pipe.promise.then([]() -> Outcome { return result::success(); }), true + }}; } catch (...) { return {std::current_exception()}; } kj::Promise> DrvOutputSubstitutionGoal::realisationFetched(bool inBuildSlot) noexcept try { - worker.childTerminated(this); maintainRunningSubstitutions.reset(); try { diff --git a/src/libstore/build/drv-output-substitution-goal.hh b/src/libstore/build/drv-output-substitution-goal.hh index f33196665..a35bf67ee 100644 --- a/src/libstore/build/drv-output-substitution-goal.hh +++ b/src/libstore/build/drv-output-substitution-goal.hh @@ -45,7 +45,7 @@ class DrvOutputSubstitutionGoal : public Goal { struct DownloadState { - Pipe outPipe; + kj::Own> outPipe; std::future> result; }; @@ -74,8 +74,6 @@ public: kj::Promise> outPathValid(bool inBuildSlot) noexcept; kj::Promise> finished() noexcept; - Finished timedOut(Error && ex) override { abort(); }; - std::string key() override; kj::Promise> work(bool inBuildSlot) noexcept override; diff --git a/src/libstore/build/goal.hh b/src/libstore/build/goal.hh index 189505308..3f6e8396e 100644 --- a/src/libstore/build/goal.hh +++ b/src/libstore/build/goal.hh @@ -114,6 +114,8 @@ struct Goal public: + struct Finished; + struct [[nodiscard]] StillAlive {}; struct [[nodiscard]] WaitForSlot {}; struct [[nodiscard]] WaitForAWhile {}; @@ -122,7 +124,7 @@ public: Goals goals; }; struct [[nodiscard]] WaitForWorld { - std::set fds; + kj::Promise> promise; bool inBuildSlot; }; struct [[nodiscard]] Finished { @@ -167,20 +169,6 @@ public: virtual void waiteeDone(GoalPtr waitee) { } - virtual WorkResult handleChildOutput(int fd, std::string_view data) - { - abort(); - } - - virtual void handleEOF(int fd) - { - } - - virtual bool respectsTimeouts() - { - return false; - } - void trace(std::string_view s); std::string getName() const @@ -188,13 +176,6 @@ public: return name; } - /** - * Callback in case of a timeout. It should wake up its waiters, - * get rid of any running child processes that are being monitored - * by the worker (important!), etc. - */ - virtual Finished timedOut(Error && ex) = 0; - virtual std::string key() = 0; virtual void cleanup() { } diff --git a/src/libstore/build/local-derivation-goal.cc b/src/libstore/build/local-derivation-goal.cc index f14d09652..040fa7461 100644 --- a/src/libstore/build/local-derivation-goal.cc +++ b/src/libstore/build/local-derivation-goal.cc @@ -121,8 +121,6 @@ LocalStore & LocalDerivationGoal::getLocalStore() void LocalDerivationGoal::killChild() { if (pid) { - worker.childTerminated(this); - /* If we're using a build user, then there is a tricky race condition: if we kill the build user before the child has done its setuid() to the build user uid, then it won't be @@ -243,14 +241,14 @@ try { try { /* Okay, we have to build. */ - auto fds = startBuilder(); + auto promise = startBuilder(); /* This state will be reached when we get EOF on the child's log pipe. */ state = &DerivationGoal::buildDone; started(); - return {WaitForWorld{std::move(fds), true}}; + return {WaitForWorld{std::move(promise), true}}; } catch (BuildError & e) { outputLocks.unlock(); @@ -390,7 +388,9 @@ void LocalDerivationGoal::cleanupPostOutputsRegisteredModeNonCheck() cleanupPostOutputsRegisteredModeCheck(); } -std::set LocalDerivationGoal::startBuilder() +// NOTE this one isn't noexcept because it's called from places that expect +// exceptions to signal failure to launch. we should change this some time. +kj::Promise> LocalDerivationGoal::startBuilder() { if ((buildUser && buildUser->getUIDCount() != 1) #if __linux__ @@ -779,7 +779,7 @@ std::set LocalDerivationGoal::startBuilder() msgs.push_back(std::move(msg)); } - return {builderOutPTY.get()}; + return handleChildOutput(); } diff --git a/src/libstore/build/local-derivation-goal.hh b/src/libstore/build/local-derivation-goal.hh index cd040bc15..6239129ab 100644 --- a/src/libstore/build/local-derivation-goal.hh +++ b/src/libstore/build/local-derivation-goal.hh @@ -218,7 +218,7 @@ struct LocalDerivationGoal : public DerivationGoal /** * Start building a derivation. */ - std::set startBuilder(); + kj::Promise> startBuilder(); /** * Fill in the environment for the builder. diff --git a/src/libstore/build/substitution-goal.cc b/src/libstore/build/substitution-goal.cc index bd0ffcb9b..058f858d4 100644 --- a/src/libstore/build/substitution-goal.cc +++ b/src/libstore/build/substitution-goal.cc @@ -208,16 +208,17 @@ try { maintainRunningSubstitutions = worker.runningSubstitutions.addTemporarily(1); - outPipe.create(); + auto pipe = kj::newPromiseAndCrossThreadFulfiller(); + outPipe = kj::mv(pipe.fulfiller); thr = std::async(std::launch::async, [this]() { + /* Wake up the worker loop when we're done. */ + Finally updateStats([this]() { outPipe->fulfill(); }); + auto & fetchPath = subPath ? *subPath : storePath; try { ReceiveInterrupts receiveInterrupts; - /* Wake up the worker loop when we're done. */ - Finally updateStats([this]() { outPipe.writeSide.close(); }); - Activity act(*logger, actSubstitute, Logger::Fields{worker.store.printStorePath(storePath), sub->getUri()}); PushActivity pact(act.id); @@ -234,7 +235,9 @@ try { }); state = &PathSubstitutionGoal::finished; - return {WaitForWorld{{outPipe.readSide.get()}, true}}; + return {WaitForWorld{ + pipe.promise.then([]() -> Outcome { return result::success(); }), true + }}; } catch (...) { return {std::current_exception()}; } @@ -244,8 +247,6 @@ kj::Promise> PathSubstitutionGoal::finished(bool inBuil try { trace("substitute finished"); - worker.childTerminated(this); - try { thr.get(); } catch (std::exception & e) { @@ -288,22 +289,13 @@ try { } -Goal::WorkResult PathSubstitutionGoal::handleChildOutput(int fd, std::string_view data) -{ - return StillAlive{}; -} - - void PathSubstitutionGoal::cleanup() { try { if (thr.valid()) { // FIXME: signal worker thread to quit. thr.get(); - worker.childTerminated(this); } - - outPipe.close(); } catch (...) { ignoreException(); } diff --git a/src/libstore/build/substitution-goal.hh b/src/libstore/build/substitution-goal.hh index 3c97b19fd..91e256fd7 100644 --- a/src/libstore/build/substitution-goal.hh +++ b/src/libstore/build/substitution-goal.hh @@ -46,7 +46,7 @@ struct PathSubstitutionGoal : public Goal /** * Pipe for the substituter's standard output. */ - Pipe outPipe; + kj::Own> outPipe; /** * The substituter thread. @@ -90,8 +90,6 @@ public: ); ~PathSubstitutionGoal(); - Finished timedOut(Error && ex) override { abort(); }; - /** * We prepend "a$" to the key name to ensure substitution goals * happen before derivation goals. @@ -112,11 +110,6 @@ public: kj::Promise> tryToRun(bool inBuildSlot) noexcept; kj::Promise> finished(bool inBuildSlot) noexcept; - /** - * Callback used by the worker to write to the log. - */ - WorkResult handleChildOutput(int fd, std::string_view data) override; - /* Called by destructor, can't be overridden */ void cleanup() override final; diff --git a/src/libstore/build/worker.cc b/src/libstore/build/worker.cc index ee45c7e3f..284adbc50 100644 --- a/src/libstore/build/worker.cc +++ b/src/libstore/build/worker.cc @@ -7,10 +7,19 @@ #include "signals.hh" #include "hook-instance.hh" // IWYU pragma: keep -#include - namespace nix { +namespace { +struct ErrorHandler : kj::TaskSet::ErrorHandler +{ + void taskFailed(kj::Exception && e) override + { + printError("unexpected async failure in Worker: %s", kj::str(e).cStr()); + abort(); + } +} errorHandler; +} + Worker::Worker(Store & store, Store & evalStore, kj::AsyncIoContext & aio) : act(*logger, actRealise) , actDerivations(*logger, actBuilds) @@ -18,6 +27,7 @@ Worker::Worker(Store & store, Store & evalStore, kj::AsyncIoContext & aio) , store(store) , evalStore(evalStore) , aio(aio) + , children(errorHandler) { /* Debugging: prevent recursive workers. */ nrLocalBuilds = 0; @@ -33,6 +43,7 @@ Worker::~Worker() are in trouble, since goals may call childTerminated() etc. in their destructors). */ topGoals.clear(); + children.clear(); assert(expectedSubstitutions == 0); assert(expectedDownloadSize == 0); @@ -209,7 +220,9 @@ void Worker::handleWorkResult(GoalPtr goal, Goal::WorkResult how) dep->waiters.insert(goal); } }, - [&](Goal::WaitForWorld & w) { childStarted(goal, w.fds, w.inBuildSlot); }, + [&](Goal::WaitForWorld & w) { + childStarted(goal, std::move(w.promise), w.inBuildSlot); + }, [&](Goal::Finished & f) { goalFinished(goal, f); }, }, how @@ -244,16 +257,22 @@ void Worker::wakeUp(GoalPtr goal) } -void Worker::childStarted(GoalPtr goal, const std::set & fds, +void Worker::childStarted(GoalPtr goal, kj::Promise> promise, bool inBuildSlot) { - Child child; - child.goal = goal; - child.goal2 = goal.get(); - child.fds = fds; - child.timeStarted = child.lastOutput = steady_time_point::clock::now(); - child.inBuildSlot = inBuildSlot; - children.emplace_back(child); + children.add(promise + .then([this, goal](auto result) { + if (result.has_value()) { + handleWorkResult(goal, Goal::ContinueImmediately{}); + } else if (result.has_error()) { + handleWorkResult(goal, std::move(result.assume_error())); + } else { + childException = result.assume_exception(); + } + }) + .attach(Finally{[this, goal, inBuildSlot] { + childTerminated(goal, inBuildSlot); + }})); if (inBuildSlot) { switch (goal->jobCategory()) { case JobCategory::Substitution: @@ -269,13 +288,13 @@ void Worker::childStarted(GoalPtr goal, const std::set & fds, } -void Worker::childTerminated(Goal * goal) +void Worker::childTerminated(GoalPtr goal, bool inBuildSlot) { - auto i = std::find_if(children.begin(), children.end(), - [&](const Child & child) { return child.goal2 == goal; }); - if (i == children.end()) return; + if (childFinished) { + childFinished->fulfill(); + } - if (i->inBuildSlot) { + if (inBuildSlot) { switch (goal->jobCategory()) { case JobCategory::Substitution: assert(nrSubstitutions > 0); @@ -290,8 +309,6 @@ void Worker::childTerminated(Goal * goal) } } - children.erase(i); - /* Wake up goals waiting for a build slot. */ for (auto & j : wantingToBuild) { GoalPtr goal = j.lock(); @@ -390,11 +407,15 @@ Goals Worker::run(std::function req) if (topGoals.empty()) break; /* Wait for input. */ - if (!children.empty() || !waitingForAWhile.empty()) + if (!children.isEmpty() || !waitingForAWhile.empty()) waitForInput(); else { assert(!awake.empty()); } + + if (childException) { + std::rethrow_exception(childException); + } } /* If --keep-going is not set, it's possible that the main goal @@ -402,7 +423,7 @@ Goals Worker::run(std::function req) --keep-going *is* set, then they must all be finished now. */ assert(!settings.keepGoing || awake.empty()); assert(!settings.keepGoing || wantingToBuild.empty()); - assert(!settings.keepGoing || children.empty()); + assert(!settings.keepGoing || children.isEmpty()); return _topGoals; } @@ -411,140 +432,52 @@ void Worker::waitForInput() { printMsg(lvlVomit, "waiting for children"); + auto childFinished = [&]{ + auto pair = kj::newPromiseAndFulfiller(); + this->childFinished = kj::mv(pair.fulfiller); + return kj::mv(pair.promise); + }(); + /* Process output from the file descriptors attached to the children, namely log output and output path creation commands. We also use this to detect child termination: if we get EOF on the logger pipe of a build, we assume that the builder has terminated. */ - bool useTimeout = false; - long timeout = 0; + std::optional timeout = 0; auto before = steady_time_point::clock::now(); - /* If we're monitoring for silence on stdout/stderr, or if there - is a build timeout, then wait for input until the first - deadline for any child. */ - auto nearest = steady_time_point::max(); // nearest deadline - if (settings.minFree.get() != 0) - // Periodicallty wake up to see if we need to run the garbage collector. - nearest = before + std::chrono::seconds(10); - for (auto & i : children) { - if (auto goal = i.goal.lock()) { - if (!goal->respectsTimeouts()) continue; - if (0 != settings.maxSilentTime) - nearest = std::min(nearest, i.lastOutput + std::chrono::seconds(settings.maxSilentTime)); - if (0 != settings.buildTimeout) - nearest = std::min(nearest, i.timeStarted + std::chrono::seconds(settings.buildTimeout)); - } - } - if (nearest != steady_time_point::max()) { - timeout = std::max(1L, (long) std::chrono::duration_cast(nearest - before).count()); - useTimeout = true; + // Periodicallty wake up to see if we need to run the garbage collector. + if (settings.minFree.get() != 0) { + timeout = 10; } /* If we are polling goals that are waiting for a lock, then wake up after a few seconds at most. */ if (!waitingForAWhile.empty()) { - useTimeout = true; if (lastWokenUp == steady_time_point::min() || lastWokenUp > before) lastWokenUp = before; timeout = std::max(1L, (long) std::chrono::duration_cast( lastWokenUp + std::chrono::seconds(settings.pollInterval) - before).count()); } else lastWokenUp = steady_time_point::min(); - if (useTimeout) - vomit("sleeping %d seconds", timeout); + if (timeout) + vomit("sleeping %d seconds", *timeout); - /* Use select() to wait for the input side of any logger pipe to - become `available'. Note that `available' (i.e., non-blocking) - includes EOF. */ - std::vector pollStatus; - std::map fdToPollStatus; - for (auto & i : children) { - for (auto & j : i.fds) { - pollStatus.push_back((struct pollfd) { .fd = j, .events = POLLIN }); - fdToPollStatus[j] = pollStatus.size() - 1; + auto waitFor = [&] { + if (timeout) { + return aio.provider->getTimer() + .afterDelay(*timeout * kj::SECONDS) + .exclusiveJoin(kj::mv(childFinished)); + } else { + return std::move(childFinished); } - } + }(); - if (poll(pollStatus.data(), pollStatus.size(), - useTimeout ? timeout * 1000 : -1) == -1) { - if (errno == EINTR) return; - throw SysError("waiting for input"); - } + waitFor.wait(aio.waitScope); auto after = steady_time_point::clock::now(); - /* Process all available file descriptors. FIXME: this is - O(children * fds). */ - decltype(children)::iterator i; - for (auto j = children.begin(); j != children.end(); j = i) { - i = std::next(j); - - checkInterrupt(); - - GoalPtr goal = j->goal.lock(); - assert(goal); - - if (!goal->exitCode.has_value() && - 0 != settings.maxSilentTime && - goal->respectsTimeouts() && - after - j->lastOutput >= std::chrono::seconds(settings.maxSilentTime)) - { - handleWorkResult( - goal, - goal->timedOut(Error( - "%1% timed out after %2% seconds of silence", - goal->getName(), - settings.maxSilentTime - )) - ); - continue; - } - - else if (!goal->exitCode.has_value() && - 0 != settings.buildTimeout && - goal->respectsTimeouts() && - after - j->timeStarted >= std::chrono::seconds(settings.buildTimeout)) - { - handleWorkResult( - goal, - goal->timedOut( - Error("%1% timed out after %2% seconds", goal->getName(), settings.buildTimeout) - ) - ); - continue; - } - - std::set fds2(j->fds); - std::vector buffer(4096); - for (auto & k : fds2) { - const auto fdPollStatusId = get(fdToPollStatus, k); - assert(fdPollStatusId); - assert(*fdPollStatusId < pollStatus.size()); - if (pollStatus.at(*fdPollStatusId).revents) { - ssize_t rd = ::read(k, buffer.data(), buffer.size()); - // FIXME: is there a cleaner way to handle pt close - // than EIO? Is this even standard? - if (rd == 0 || (rd == -1 && errno == EIO)) { - debug("%1%: got EOF", goal->getName()); - goal->handleEOF(k); - handleWorkResult(goal, Goal::ContinueImmediately{}); - j->fds.erase(k); - } else if (rd == -1) { - if (errno != EINTR) - throw SysError("%s: read failed", goal->getName()); - } else { - printMsg(lvlVomit, "%1%: read %2% bytes", - goal->getName(), rd); - std::string_view data(charptr_cast(buffer.data()), rd); - j->lastOutput = after; - handleWorkResult(goal, goal->handleChildOutput(k, data)); - } - } - } - } - if (!waitingForAWhile.empty() && lastWokenUp + std::chrono::seconds(settings.pollInterval) <= after) { lastWokenUp = after; for (auto & i : waitingForAWhile) { diff --git a/src/libstore/build/worker.hh b/src/libstore/build/worker.hh index 6735ea0b9..37d80ba7b 100644 --- a/src/libstore/build/worker.hh +++ b/src/libstore/build/worker.hh @@ -21,24 +21,6 @@ class DrvOutputSubstitutionGoal; typedef std::chrono::time_point steady_time_point; -/** - * A mapping used to remember for each child process to what goal it - * belongs, and file descriptors for receiving log data and output - * path creation commands. - */ -struct Child -{ - WeakGoalPtr goal; - Goal * goal2; // ugly hackery - std::set fds; - bool inBuildSlot; - /** - * Time we last got output on stdout/stderr - */ - steady_time_point lastOutput; - steady_time_point timeStarted; -}; - /* Forward definition. */ struct HookInstance; @@ -116,11 +98,6 @@ private: */ WeakGoals wantingToBuild; - /** - * Child processes currently running. - */ - std::list children; - /** * Number of build slots occupied. This includes local builds but does not * include substitutions or remote builds via the build hook. @@ -179,6 +156,8 @@ private: void goalFinished(GoalPtr goal, Goal::Finished & f); void handleWorkResult(GoalPtr goal, Goal::WorkResult how); + kj::Own> childFinished; + /** * Put `goal` to sleep until a build slot becomes available (which * might be right away). @@ -212,9 +191,14 @@ private: * Registers a running child process. `inBuildSlot` means that * the process counts towards the jobs limit. */ - void childStarted(GoalPtr goal, const std::set & fds, + void childStarted(GoalPtr goal, kj::Promise> promise, bool inBuildSlot); + /** + * Unregisters a running child process. + */ + void childTerminated(GoalPtr goal, bool inBuildSlot); + /** * Pass current stats counters to the logger for progress bar updates. */ @@ -240,6 +224,11 @@ public: Store & evalStore; kj::AsyncIoContext & aio; +private: + kj::TaskSet children; + std::exception_ptr childException; + +public: struct HookState { std::unique_ptr instance; @@ -302,11 +291,6 @@ private: GoalPtr makeGoal(const DerivedPath & req, BuildMode buildMode = bmNormal) override; public: - /** - * Unregisters a running child process. - */ - void childTerminated(Goal * goal); - /** * Loop until the specified top-level goals have finished. */