forked from lix-project/lix
Fix tests on systems with a non-master git defaultBranch #1
11 changed files with 271 additions and 253 deletions
|
@ -11,7 +11,10 @@
|
|||
#include "drv-output-substitution-goal.hh"
|
||||
#include "strings.hh"
|
||||
|
||||
#include <boost/outcome/try.hpp>
|
||||
#include <fstream>
|
||||
#include <kj/async-unix.h>
|
||||
#include <kj/debug.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/socket.h>
|
||||
#include <sys/un.h>
|
||||
|
@ -780,7 +783,7 @@ try {
|
|||
buildResult.startTime = time(0); // inexact
|
||||
state = &DerivationGoal::buildDone;
|
||||
started();
|
||||
return WaitForWorld{std::move(a.fds), false};
|
||||
return WaitForWorld{std::move(a.promise), false};
|
||||
},
|
||||
[&](HookReply::Postpone) -> std::optional<WorkResult> {
|
||||
/* Not now; wait until at least one child finishes or
|
||||
|
@ -992,9 +995,6 @@ try {
|
|||
buildResult.timesBuilt++;
|
||||
buildResult.stopTime = time(0);
|
||||
|
||||
/* So the child is gone now. */
|
||||
worker.childTerminated(this);
|
||||
|
||||
/* Close the read side of the logger pipe. */
|
||||
closeReadPipes();
|
||||
|
||||
|
@ -1266,12 +1266,8 @@ HookReply DerivationGoal::tryBuildHook(bool inBuildSlot)
|
|||
/* Create the log file and pipe. */
|
||||
Path logFile = openLogFile();
|
||||
|
||||
std::set<int> fds;
|
||||
fds.insert(hook->fromHook.get());
|
||||
fds.insert(hook->builderOut.get());
|
||||
builderOutFD = &hook->builderOut;
|
||||
|
||||
return HookReply::Accept{std::move(fds)};
|
||||
return HookReply::Accept{handleChildOutput()};
|
||||
}
|
||||
|
||||
|
||||
|
@ -1331,23 +1327,69 @@ void DerivationGoal::closeLogFile()
|
|||
}
|
||||
|
||||
|
||||
Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data)
|
||||
Goal::Finished DerivationGoal::tooMuchLogs()
|
||||
{
|
||||
assert(builderOutFD);
|
||||
|
||||
auto tooMuchLogs = [&] {
|
||||
killChild();
|
||||
return done(
|
||||
BuildResult::LogLimitExceeded, {},
|
||||
Error("%s killed after writing more than %d bytes of log output",
|
||||
getName(), settings.maxLogSize));
|
||||
};
|
||||
}
|
||||
|
||||
struct DerivationGoal::InputStream final : private kj::AsyncObject
|
||||
{
|
||||
int fd;
|
||||
kj::UnixEventPort::FdObserver observer;
|
||||
|
||||
InputStream(kj::UnixEventPort & ep, int fd)
|
||||
: fd(fd)
|
||||
, observer(ep, fd, kj::UnixEventPort::FdObserver::OBSERVE_READ)
|
||||
{
|
||||
int flags = fcntl(fd, F_GETFL);
|
||||
if (flags < 0) {
|
||||
throw SysError("fcntl(F_GETFL) failed on fd %i", fd);
|
||||
}
|
||||
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) {
|
||||
throw SysError("fcntl(F_SETFL) failed on fd %i", fd);
|
||||
}
|
||||
}
|
||||
|
||||
kj::Promise<std::string_view> read(kj::ArrayPtr<char> buffer)
|
||||
{
|
||||
const auto res = ::read(fd, buffer.begin(), buffer.size());
|
||||
// closing a pty endpoint causes EIO on the other endpoint. stock kj streams
|
||||
// do not handle this and throw exceptions we can't ask for errno instead :(
|
||||
// (we can't use `errno` either because kj may well have mangled it by now.)
|
||||
if (res == 0 || (res == -1 && errno == EIO)) {
|
||||
return std::string_view{};
|
||||
}
|
||||
|
||||
KJ_NONBLOCKING_SYSCALL(res) {}
|
||||
|
||||
if (res > 0) {
|
||||
return std::string_view{buffer.begin(), static_cast<size_t>(res)};
|
||||
}
|
||||
|
||||
return observer.whenBecomesReadable().then([this, buffer] {
|
||||
return read(buffer);
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
kj::Promise<Outcome<void, Goal::Finished>> DerivationGoal::handleBuilderOutput(InputStream & in) noexcept
|
||||
try {
|
||||
auto buf = kj::heapArray<char>(4096);
|
||||
while (true) {
|
||||
auto data = co_await in.read(buf);
|
||||
lastChildActivity = worker.aio.provider->getTimer().now();
|
||||
|
||||
if (data.empty()) {
|
||||
co_return result::success();
|
||||
}
|
||||
|
||||
// local & `ssh://`-builds are dealt with here.
|
||||
if (fd == builderOutFD->get()) {
|
||||
logSize += data.size();
|
||||
if (settings.maxLogSize && logSize > settings.maxLogSize) {
|
||||
return tooMuchLogs();
|
||||
co_return tooMuchLogs();
|
||||
}
|
||||
|
||||
for (auto c : data)
|
||||
|
@ -1362,10 +1404,22 @@ Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data
|
|||
}
|
||||
|
||||
if (logSink) (*logSink)(data);
|
||||
return StillAlive{};
|
||||
}
|
||||
} catch (...) {
|
||||
co_return std::current_exception();
|
||||
}
|
||||
|
||||
kj::Promise<Outcome<void, Goal::Finished>> DerivationGoal::handleHookOutput(InputStream & in) noexcept
|
||||
try {
|
||||
auto buf = kj::heapArray<char>(4096);
|
||||
while (true) {
|
||||
auto data = co_await in.read(buf);
|
||||
lastChildActivity = worker.aio.provider->getTimer().now();
|
||||
|
||||
if (data.empty()) {
|
||||
co_return result::success();
|
||||
}
|
||||
|
||||
if (hook && fd == hook->fromHook.get()) {
|
||||
for (auto c : data)
|
||||
if (c == '\n') {
|
||||
auto json = parseJSONMessage(currentHookLine);
|
||||
|
@ -1381,7 +1435,7 @@ Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data
|
|||
(fields.size() > 0 ? fields[0].get<std::string>() : "") + "\n";
|
||||
logSize += logLine.size();
|
||||
if (settings.maxLogSize && logSize > settings.maxLogSize) {
|
||||
return tooMuchLogs();
|
||||
co_return tooMuchLogs();
|
||||
}
|
||||
(*logSink)(logLine);
|
||||
} else if (type == resSetPhase && ! fields.is_null()) {
|
||||
|
@ -1405,16 +1459,83 @@ Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data
|
|||
} else
|
||||
currentHookLine += c;
|
||||
}
|
||||
|
||||
return StillAlive{};
|
||||
} catch (...) {
|
||||
co_return std::current_exception();
|
||||
}
|
||||
|
||||
kj::Promise<Outcome<void, Goal::Finished>> DerivationGoal::handleChildOutput() noexcept
|
||||
try {
|
||||
assert(builderOutFD);
|
||||
|
||||
void DerivationGoal::handleEOF(int fd)
|
||||
{
|
||||
auto builderIn = kj::heap<InputStream>(worker.aio.unixEventPort, builderOutFD->get());
|
||||
kj::Own<InputStream> hookIn;
|
||||
if (hook) {
|
||||
hookIn = kj::heap<InputStream>(worker.aio.unixEventPort, hook->fromHook.get());
|
||||
}
|
||||
|
||||
auto handlers = handleChildStreams(*builderIn, hookIn.get()).attach(std::move(builderIn), std::move(hookIn));
|
||||
|
||||
if (respectsTimeouts() && settings.buildTimeout != 0) {
|
||||
handlers = handlers.exclusiveJoin(
|
||||
worker.aio.provider->getTimer()
|
||||
.afterDelay(settings.buildTimeout.get() * kj::SECONDS)
|
||||
.then([this]() -> Outcome<void, Finished> {
|
||||
return timedOut(
|
||||
Error("%1% timed out after %2% seconds", name, settings.buildTimeout)
|
||||
);
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
return handlers.then([this](auto r) -> Outcome<void, Finished> {
|
||||
if (!currentLogLine.empty()) flushLine();
|
||||
return r;
|
||||
});
|
||||
} catch (...) {
|
||||
return {std::current_exception()};
|
||||
}
|
||||
|
||||
kj::Promise<Outcome<void, Goal::Finished>> DerivationGoal::monitorForSilence() noexcept
|
||||
{
|
||||
while (true) {
|
||||
const auto stash = lastChildActivity;
|
||||
auto waitUntil = lastChildActivity + settings.maxSilentTime.get() * kj::SECONDS;
|
||||
co_await worker.aio.provider->getTimer().atTime(waitUntil);
|
||||
if (lastChildActivity == stash) {
|
||||
co_return timedOut(
|
||||
Error("%1% timed out after %2% seconds of silence", name, settings.maxSilentTime)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
kj::Promise<Outcome<void, Goal::Finished>>
|
||||
DerivationGoal::handleChildStreams(InputStream & builderIn, InputStream * hookIn) noexcept
|
||||
{
|
||||
lastChildActivity = worker.aio.provider->getTimer().now();
|
||||
|
||||
auto handlers = kj::joinPromisesFailFast([&] {
|
||||
kj::Vector<kj::Promise<Outcome<void, Finished>>> parts{2};
|
||||
|
||||
parts.add(handleBuilderOutput(builderIn));
|
||||
if (hookIn) {
|
||||
parts.add(handleHookOutput(*hookIn));
|
||||
}
|
||||
|
||||
return parts.releaseAsArray();
|
||||
}());
|
||||
|
||||
if (respectsTimeouts() && settings.maxSilentTime != 0) {
|
||||
handlers = handlers.exclusiveJoin(monitorForSilence().then([](auto r) {
|
||||
return kj::arr(std::move(r));
|
||||
}));
|
||||
}
|
||||
|
||||
for (auto r : co_await handlers) {
|
||||
BOOST_OUTCOME_CO_TRYV(r);
|
||||
}
|
||||
co_return result::success();
|
||||
}
|
||||
|
||||
void DerivationGoal::flushLine()
|
||||
{
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
#include "store-api.hh"
|
||||
#include "pathlocks.hh"
|
||||
#include "goal.hh"
|
||||
#include <kj/time.h>
|
||||
|
||||
namespace nix {
|
||||
|
||||
|
@ -17,7 +18,7 @@ struct HookInstance;
|
|||
|
||||
struct HookReplyBase {
|
||||
struct [[nodiscard]] Accept {
|
||||
std::set<int> fds;
|
||||
kj::Promise<Outcome<void, Goal::Finished>> promise;
|
||||
};
|
||||
struct [[nodiscard]] Decline {};
|
||||
struct [[nodiscard]] Postpone {};
|
||||
|
@ -70,6 +71,8 @@ struct InitialOutput {
|
|||
*/
|
||||
struct DerivationGoal : public Goal
|
||||
{
|
||||
struct InputStream;
|
||||
|
||||
/**
|
||||
* Whether to use an on-disk .drv file.
|
||||
*/
|
||||
|
@ -242,7 +245,7 @@ struct DerivationGoal : public Goal
|
|||
BuildMode buildMode = bmNormal);
|
||||
virtual ~DerivationGoal() noexcept(false);
|
||||
|
||||
Finished timedOut(Error && ex) override;
|
||||
Finished timedOut(Error && ex);
|
||||
|
||||
std::string key() override;
|
||||
|
||||
|
@ -312,13 +315,19 @@ struct DerivationGoal : public Goal
|
|||
virtual void cleanupPostOutputsRegisteredModeCheck();
|
||||
virtual void cleanupPostOutputsRegisteredModeNonCheck();
|
||||
|
||||
/**
|
||||
* Callback used by the worker to write to the log.
|
||||
*/
|
||||
WorkResult handleChildOutput(int fd, std::string_view data) override;
|
||||
void handleEOF(int fd) override;
|
||||
protected:
|
||||
kj::TimePoint lastChildActivity = kj::minValue;
|
||||
|
||||
kj::Promise<Outcome<void, Finished>> handleChildOutput() noexcept;
|
||||
kj::Promise<Outcome<void, Finished>>
|
||||
handleChildStreams(InputStream & builderIn, InputStream * hookIn) noexcept;
|
||||
kj::Promise<Outcome<void, Finished>> handleBuilderOutput(InputStream & in) noexcept;
|
||||
kj::Promise<Outcome<void, Finished>> handleHookOutput(InputStream & in) noexcept;
|
||||
kj::Promise<Outcome<void, Finished>> monitorForSilence() noexcept;
|
||||
Finished tooMuchLogs();
|
||||
void flushLine();
|
||||
|
||||
public:
|
||||
/**
|
||||
* Wrappers around the corresponding Store methods that first consult the
|
||||
* derivation. This is currently needed because when there is no drv file
|
||||
|
@ -357,6 +366,11 @@ struct DerivationGoal : public Goal
|
|||
|
||||
void waiteeDone(GoalPtr waitee) override;
|
||||
|
||||
virtual bool respectsTimeouts()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
StorePathSet exportReferences(const StorePathSet & storePaths);
|
||||
|
||||
JobCategory jobCategory() const override {
|
||||
|
|
|
@ -69,24 +69,26 @@ try {
|
|||
some other error occurs), so it must not touch `this`. So put
|
||||
the shared state in a separate refcounted object. */
|
||||
downloadState = std::make_shared<DownloadState>();
|
||||
downloadState->outPipe.create();
|
||||
auto pipe = kj::newPromiseAndCrossThreadFulfiller<void>();
|
||||
downloadState->outPipe = kj::mv(pipe.fulfiller);
|
||||
|
||||
downloadState->result =
|
||||
std::async(std::launch::async, [downloadState{downloadState}, id{id}, sub{sub}] {
|
||||
Finally updateStats([&]() { downloadState->outPipe->fulfill(); });
|
||||
ReceiveInterrupts receiveInterrupts;
|
||||
Finally updateStats([&]() { downloadState->outPipe.writeSide.close(); });
|
||||
return sub->queryRealisation(id);
|
||||
});
|
||||
|
||||
state = &DrvOutputSubstitutionGoal::realisationFetched;
|
||||
return {WaitForWorld{{downloadState->outPipe.readSide.get()}, true}};
|
||||
return {WaitForWorld{
|
||||
pipe.promise.then([]() -> Outcome<void, Finished> { return result::success(); }), true
|
||||
}};
|
||||
} catch (...) {
|
||||
return {std::current_exception()};
|
||||
}
|
||||
|
||||
kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::realisationFetched(bool inBuildSlot) noexcept
|
||||
try {
|
||||
worker.childTerminated(this);
|
||||
maintainRunningSubstitutions.reset();
|
||||
|
||||
try {
|
||||
|
|
|
@ -45,7 +45,7 @@ class DrvOutputSubstitutionGoal : public Goal {
|
|||
|
||||
struct DownloadState
|
||||
{
|
||||
Pipe outPipe;
|
||||
kj::Own<kj::CrossThreadPromiseFulfiller<void>> outPipe;
|
||||
std::future<std::shared_ptr<const Realisation>> result;
|
||||
};
|
||||
|
||||
|
@ -74,8 +74,6 @@ public:
|
|||
kj::Promise<Result<WorkResult>> outPathValid(bool inBuildSlot) noexcept;
|
||||
kj::Promise<Result<WorkResult>> finished() noexcept;
|
||||
|
||||
Finished timedOut(Error && ex) override { abort(); };
|
||||
|
||||
std::string key() override;
|
||||
|
||||
kj::Promise<Result<WorkResult>> work(bool inBuildSlot) noexcept override;
|
||||
|
|
|
@ -114,6 +114,8 @@ struct Goal
|
|||
|
||||
public:
|
||||
|
||||
struct Finished;
|
||||
|
||||
struct [[nodiscard]] StillAlive {};
|
||||
struct [[nodiscard]] WaitForSlot {};
|
||||
struct [[nodiscard]] WaitForAWhile {};
|
||||
|
@ -122,7 +124,7 @@ public:
|
|||
Goals goals;
|
||||
};
|
||||
struct [[nodiscard]] WaitForWorld {
|
||||
std::set<int> fds;
|
||||
kj::Promise<Outcome<void, Finished>> promise;
|
||||
bool inBuildSlot;
|
||||
};
|
||||
struct [[nodiscard]] Finished {
|
||||
|
@ -167,20 +169,6 @@ public:
|
|||
|
||||
virtual void waiteeDone(GoalPtr waitee) { }
|
||||
|
||||
virtual WorkResult handleChildOutput(int fd, std::string_view data)
|
||||
{
|
||||
abort();
|
||||
}
|
||||
|
||||
virtual void handleEOF(int fd)
|
||||
{
|
||||
}
|
||||
|
||||
virtual bool respectsTimeouts()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
void trace(std::string_view s);
|
||||
|
||||
std::string getName() const
|
||||
|
@ -188,13 +176,6 @@ public:
|
|||
return name;
|
||||
}
|
||||
|
||||
/**
|
||||
* Callback in case of a timeout. It should wake up its waiters,
|
||||
* get rid of any running child processes that are being monitored
|
||||
* by the worker (important!), etc.
|
||||
*/
|
||||
virtual Finished timedOut(Error && ex) = 0;
|
||||
|
||||
virtual std::string key() = 0;
|
||||
|
||||
virtual void cleanup() { }
|
||||
|
|
|
@ -121,8 +121,6 @@ LocalStore & LocalDerivationGoal::getLocalStore()
|
|||
void LocalDerivationGoal::killChild()
|
||||
{
|
||||
if (pid) {
|
||||
worker.childTerminated(this);
|
||||
|
||||
/* If we're using a build user, then there is a tricky race
|
||||
condition: if we kill the build user before the child has
|
||||
done its setuid() to the build user uid, then it won't be
|
||||
|
@ -243,14 +241,14 @@ try {
|
|||
try {
|
||||
|
||||
/* Okay, we have to build. */
|
||||
auto fds = startBuilder();
|
||||
auto promise = startBuilder();
|
||||
|
||||
/* This state will be reached when we get EOF on the child's
|
||||
log pipe. */
|
||||
state = &DerivationGoal::buildDone;
|
||||
|
||||
started();
|
||||
return {WaitForWorld{std::move(fds), true}};
|
||||
return {WaitForWorld{std::move(promise), true}};
|
||||
|
||||
} catch (BuildError & e) {
|
||||
outputLocks.unlock();
|
||||
|
@ -390,7 +388,9 @@ void LocalDerivationGoal::cleanupPostOutputsRegisteredModeNonCheck()
|
|||
cleanupPostOutputsRegisteredModeCheck();
|
||||
}
|
||||
|
||||
std::set<int> LocalDerivationGoal::startBuilder()
|
||||
// NOTE this one isn't noexcept because it's called from places that expect
|
||||
// exceptions to signal failure to launch. we should change this some time.
|
||||
kj::Promise<Outcome<void, Goal::Finished>> LocalDerivationGoal::startBuilder()
|
||||
{
|
||||
if ((buildUser && buildUser->getUIDCount() != 1)
|
||||
#if __linux__
|
||||
|
@ -779,7 +779,7 @@ std::set<int> LocalDerivationGoal::startBuilder()
|
|||
msgs.push_back(std::move(msg));
|
||||
}
|
||||
|
||||
return {builderOutPTY.get()};
|
||||
return handleChildOutput();
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -218,7 +218,7 @@ struct LocalDerivationGoal : public DerivationGoal
|
|||
/**
|
||||
* Start building a derivation.
|
||||
*/
|
||||
std::set<int> startBuilder();
|
||||
kj::Promise<Outcome<void, Finished>> startBuilder();
|
||||
|
||||
/**
|
||||
* Fill in the environment for the builder.
|
||||
|
|
|
@ -208,16 +208,17 @@ try {
|
|||
|
||||
maintainRunningSubstitutions = worker.runningSubstitutions.addTemporarily(1);
|
||||
|
||||
outPipe.create();
|
||||
auto pipe = kj::newPromiseAndCrossThreadFulfiller<void>();
|
||||
outPipe = kj::mv(pipe.fulfiller);
|
||||
|
||||
thr = std::async(std::launch::async, [this]() {
|
||||
/* Wake up the worker loop when we're done. */
|
||||
Finally updateStats([this]() { outPipe->fulfill(); });
|
||||
|
||||
auto & fetchPath = subPath ? *subPath : storePath;
|
||||
try {
|
||||
ReceiveInterrupts receiveInterrupts;
|
||||
|
||||
/* Wake up the worker loop when we're done. */
|
||||
Finally updateStats([this]() { outPipe.writeSide.close(); });
|
||||
|
||||
Activity act(*logger, actSubstitute, Logger::Fields{worker.store.printStorePath(storePath), sub->getUri()});
|
||||
PushActivity pact(act.id);
|
||||
|
||||
|
@ -234,7 +235,9 @@ try {
|
|||
});
|
||||
|
||||
state = &PathSubstitutionGoal::finished;
|
||||
return {WaitForWorld{{outPipe.readSide.get()}, true}};
|
||||
return {WaitForWorld{
|
||||
pipe.promise.then([]() -> Outcome<void, Finished> { return result::success(); }), true
|
||||
}};
|
||||
} catch (...) {
|
||||
return {std::current_exception()};
|
||||
}
|
||||
|
@ -244,8 +247,6 @@ kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::finished(bool inBuil
|
|||
try {
|
||||
trace("substitute finished");
|
||||
|
||||
worker.childTerminated(this);
|
||||
|
||||
try {
|
||||
thr.get();
|
||||
} catch (std::exception & e) {
|
||||
|
@ -288,22 +289,13 @@ try {
|
|||
}
|
||||
|
||||
|
||||
Goal::WorkResult PathSubstitutionGoal::handleChildOutput(int fd, std::string_view data)
|
||||
{
|
||||
return StillAlive{};
|
||||
}
|
||||
|
||||
|
||||
void PathSubstitutionGoal::cleanup()
|
||||
{
|
||||
try {
|
||||
if (thr.valid()) {
|
||||
// FIXME: signal worker thread to quit.
|
||||
thr.get();
|
||||
worker.childTerminated(this);
|
||||
}
|
||||
|
||||
outPipe.close();
|
||||
} catch (...) {
|
||||
ignoreException();
|
||||
}
|
||||
|
|
|
@ -46,7 +46,7 @@ struct PathSubstitutionGoal : public Goal
|
|||
/**
|
||||
* Pipe for the substituter's standard output.
|
||||
*/
|
||||
Pipe outPipe;
|
||||
kj::Own<kj::CrossThreadPromiseFulfiller<void>> outPipe;
|
||||
|
||||
/**
|
||||
* The substituter thread.
|
||||
|
@ -90,8 +90,6 @@ public:
|
|||
);
|
||||
~PathSubstitutionGoal();
|
||||
|
||||
Finished timedOut(Error && ex) override { abort(); };
|
||||
|
||||
/**
|
||||
* We prepend "a$" to the key name to ensure substitution goals
|
||||
* happen before derivation goals.
|
||||
|
@ -112,11 +110,6 @@ public:
|
|||
kj::Promise<Result<WorkResult>> tryToRun(bool inBuildSlot) noexcept;
|
||||
kj::Promise<Result<WorkResult>> finished(bool inBuildSlot) noexcept;
|
||||
|
||||
/**
|
||||
* Callback used by the worker to write to the log.
|
||||
*/
|
||||
WorkResult handleChildOutput(int fd, std::string_view data) override;
|
||||
|
||||
/* Called by destructor, can't be overridden */
|
||||
void cleanup() override final;
|
||||
|
||||
|
|
|
@ -7,10 +7,19 @@
|
|||
#include "signals.hh"
|
||||
#include "hook-instance.hh" // IWYU pragma: keep
|
||||
|
||||
#include <poll.h>
|
||||
|
||||
namespace nix {
|
||||
|
||||
namespace {
|
||||
struct ErrorHandler : kj::TaskSet::ErrorHandler
|
||||
{
|
||||
void taskFailed(kj::Exception && e) override
|
||||
{
|
||||
printError("unexpected async failure in Worker: %s", kj::str(e).cStr());
|
||||
abort();
|
||||
}
|
||||
} errorHandler;
|
||||
}
|
||||
|
||||
Worker::Worker(Store & store, Store & evalStore, kj::AsyncIoContext & aio)
|
||||
: act(*logger, actRealise)
|
||||
, actDerivations(*logger, actBuilds)
|
||||
|
@ -18,6 +27,7 @@ Worker::Worker(Store & store, Store & evalStore, kj::AsyncIoContext & aio)
|
|||
, store(store)
|
||||
, evalStore(evalStore)
|
||||
, aio(aio)
|
||||
, children(errorHandler)
|
||||
{
|
||||
/* Debugging: prevent recursive workers. */
|
||||
nrLocalBuilds = 0;
|
||||
|
@ -33,6 +43,7 @@ Worker::~Worker()
|
|||
are in trouble, since goals may call childTerminated() etc. in
|
||||
their destructors). */
|
||||
topGoals.clear();
|
||||
children.clear();
|
||||
|
||||
assert(expectedSubstitutions == 0);
|
||||
assert(expectedDownloadSize == 0);
|
||||
|
@ -209,7 +220,9 @@ void Worker::handleWorkResult(GoalPtr goal, Goal::WorkResult how)
|
|||
dep->waiters.insert(goal);
|
||||
}
|
||||
},
|
||||
[&](Goal::WaitForWorld & w) { childStarted(goal, w.fds, w.inBuildSlot); },
|
||||
[&](Goal::WaitForWorld & w) {
|
||||
childStarted(goal, std::move(w.promise), w.inBuildSlot);
|
||||
},
|
||||
[&](Goal::Finished & f) { goalFinished(goal, f); },
|
||||
},
|
||||
how
|
||||
|
@ -244,16 +257,22 @@ void Worker::wakeUp(GoalPtr goal)
|
|||
}
|
||||
|
||||
|
||||
void Worker::childStarted(GoalPtr goal, const std::set<int> & fds,
|
||||
void Worker::childStarted(GoalPtr goal, kj::Promise<Outcome<void, Goal::Finished>> promise,
|
||||
bool inBuildSlot)
|
||||
{
|
||||
Child child;
|
||||
child.goal = goal;
|
||||
child.goal2 = goal.get();
|
||||
child.fds = fds;
|
||||
child.timeStarted = child.lastOutput = steady_time_point::clock::now();
|
||||
child.inBuildSlot = inBuildSlot;
|
||||
children.emplace_back(child);
|
||||
children.add(promise
|
||||
.then([this, goal](auto result) {
|
||||
if (result.has_value()) {
|
||||
handleWorkResult(goal, Goal::ContinueImmediately{});
|
||||
} else if (result.has_error()) {
|
||||
handleWorkResult(goal, std::move(result.assume_error()));
|
||||
} else {
|
||||
childException = result.assume_exception();
|
||||
}
|
||||
})
|
||||
.attach(Finally{[this, goal, inBuildSlot] {
|
||||
childTerminated(goal, inBuildSlot);
|
||||
}}));
|
||||
if (inBuildSlot) {
|
||||
switch (goal->jobCategory()) {
|
||||
case JobCategory::Substitution:
|
||||
|
@ -269,13 +288,13 @@ void Worker::childStarted(GoalPtr goal, const std::set<int> & fds,
|
|||
}
|
||||
|
||||
|
||||
void Worker::childTerminated(Goal * goal)
|
||||
void Worker::childTerminated(GoalPtr goal, bool inBuildSlot)
|
||||
{
|
||||
auto i = std::find_if(children.begin(), children.end(),
|
||||
[&](const Child & child) { return child.goal2 == goal; });
|
||||
if (i == children.end()) return;
|
||||
if (childFinished) {
|
||||
childFinished->fulfill();
|
||||
}
|
||||
|
||||
if (i->inBuildSlot) {
|
||||
if (inBuildSlot) {
|
||||
switch (goal->jobCategory()) {
|
||||
case JobCategory::Substitution:
|
||||
assert(nrSubstitutions > 0);
|
||||
|
@ -290,8 +309,6 @@ void Worker::childTerminated(Goal * goal)
|
|||
}
|
||||
}
|
||||
|
||||
children.erase(i);
|
||||
|
||||
/* Wake up goals waiting for a build slot. */
|
||||
for (auto & j : wantingToBuild) {
|
||||
GoalPtr goal = j.lock();
|
||||
|
@ -390,11 +407,15 @@ Goals Worker::run(std::function<Goals (GoalFactory &)> req)
|
|||
if (topGoals.empty()) break;
|
||||
|
||||
/* Wait for input. */
|
||||
if (!children.empty() || !waitingForAWhile.empty())
|
||||
if (!children.isEmpty() || !waitingForAWhile.empty())
|
||||
waitForInput();
|
||||
else {
|
||||
assert(!awake.empty());
|
||||
}
|
||||
|
||||
if (childException) {
|
||||
std::rethrow_exception(childException);
|
||||
}
|
||||
}
|
||||
|
||||
/* If --keep-going is not set, it's possible that the main goal
|
||||
|
@ -402,7 +423,7 @@ Goals Worker::run(std::function<Goals (GoalFactory &)> req)
|
|||
--keep-going *is* set, then they must all be finished now. */
|
||||
assert(!settings.keepGoing || awake.empty());
|
||||
assert(!settings.keepGoing || wantingToBuild.empty());
|
||||
assert(!settings.keepGoing || children.empty());
|
||||
assert(!settings.keepGoing || children.isEmpty());
|
||||
|
||||
return _topGoals;
|
||||
}
|
||||
|
@ -411,140 +432,52 @@ void Worker::waitForInput()
|
|||
{
|
||||
printMsg(lvlVomit, "waiting for children");
|
||||
|
||||
auto childFinished = [&]{
|
||||
auto pair = kj::newPromiseAndFulfiller<void>();
|
||||
this->childFinished = kj::mv(pair.fulfiller);
|
||||
return kj::mv(pair.promise);
|
||||
}();
|
||||
|
||||
/* Process output from the file descriptors attached to the
|
||||
children, namely log output and output path creation commands.
|
||||
We also use this to detect child termination: if we get EOF on
|
||||
the logger pipe of a build, we assume that the builder has
|
||||
terminated. */
|
||||
|
||||
bool useTimeout = false;
|
||||
long timeout = 0;
|
||||
std::optional<long> timeout = 0;
|
||||
auto before = steady_time_point::clock::now();
|
||||
|
||||
/* If we're monitoring for silence on stdout/stderr, or if there
|
||||
is a build timeout, then wait for input until the first
|
||||
deadline for any child. */
|
||||
auto nearest = steady_time_point::max(); // nearest deadline
|
||||
if (settings.minFree.get() != 0)
|
||||
// Periodicallty wake up to see if we need to run the garbage collector.
|
||||
nearest = before + std::chrono::seconds(10);
|
||||
for (auto & i : children) {
|
||||
if (auto goal = i.goal.lock()) {
|
||||
if (!goal->respectsTimeouts()) continue;
|
||||
if (0 != settings.maxSilentTime)
|
||||
nearest = std::min(nearest, i.lastOutput + std::chrono::seconds(settings.maxSilentTime));
|
||||
if (0 != settings.buildTimeout)
|
||||
nearest = std::min(nearest, i.timeStarted + std::chrono::seconds(settings.buildTimeout));
|
||||
}
|
||||
}
|
||||
if (nearest != steady_time_point::max()) {
|
||||
timeout = std::max(1L, (long) std::chrono::duration_cast<std::chrono::seconds>(nearest - before).count());
|
||||
useTimeout = true;
|
||||
if (settings.minFree.get() != 0) {
|
||||
timeout = 10;
|
||||
}
|
||||
|
||||
/* If we are polling goals that are waiting for a lock, then wake
|
||||
up after a few seconds at most. */
|
||||
if (!waitingForAWhile.empty()) {
|
||||
useTimeout = true;
|
||||
if (lastWokenUp == steady_time_point::min() || lastWokenUp > before) lastWokenUp = before;
|
||||
timeout = std::max(1L,
|
||||
(long) std::chrono::duration_cast<std::chrono::seconds>(
|
||||
lastWokenUp + std::chrono::seconds(settings.pollInterval) - before).count());
|
||||
} else lastWokenUp = steady_time_point::min();
|
||||
|
||||
if (useTimeout)
|
||||
vomit("sleeping %d seconds", timeout);
|
||||
if (timeout)
|
||||
vomit("sleeping %d seconds", *timeout);
|
||||
|
||||
/* Use select() to wait for the input side of any logger pipe to
|
||||
become `available'. Note that `available' (i.e., non-blocking)
|
||||
includes EOF. */
|
||||
std::vector<struct pollfd> pollStatus;
|
||||
std::map<int, size_t> fdToPollStatus;
|
||||
for (auto & i : children) {
|
||||
for (auto & j : i.fds) {
|
||||
pollStatus.push_back((struct pollfd) { .fd = j, .events = POLLIN });
|
||||
fdToPollStatus[j] = pollStatus.size() - 1;
|
||||
}
|
||||
auto waitFor = [&] {
|
||||
if (timeout) {
|
||||
return aio.provider->getTimer()
|
||||
.afterDelay(*timeout * kj::SECONDS)
|
||||
.exclusiveJoin(kj::mv(childFinished));
|
||||
} else {
|
||||
return std::move(childFinished);
|
||||
}
|
||||
}();
|
||||
|
||||
if (poll(pollStatus.data(), pollStatus.size(),
|
||||
useTimeout ? timeout * 1000 : -1) == -1) {
|
||||
if (errno == EINTR) return;
|
||||
throw SysError("waiting for input");
|
||||
}
|
||||
waitFor.wait(aio.waitScope);
|
||||
|
||||
auto after = steady_time_point::clock::now();
|
||||
|
||||
/* Process all available file descriptors. FIXME: this is
|
||||
O(children * fds). */
|
||||
decltype(children)::iterator i;
|
||||
for (auto j = children.begin(); j != children.end(); j = i) {
|
||||
i = std::next(j);
|
||||
|
||||
checkInterrupt();
|
||||
|
||||
GoalPtr goal = j->goal.lock();
|
||||
assert(goal);
|
||||
|
||||
if (!goal->exitCode.has_value() &&
|
||||
0 != settings.maxSilentTime &&
|
||||
goal->respectsTimeouts() &&
|
||||
after - j->lastOutput >= std::chrono::seconds(settings.maxSilentTime))
|
||||
{
|
||||
handleWorkResult(
|
||||
goal,
|
||||
goal->timedOut(Error(
|
||||
"%1% timed out after %2% seconds of silence",
|
||||
goal->getName(),
|
||||
settings.maxSilentTime
|
||||
))
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
else if (!goal->exitCode.has_value() &&
|
||||
0 != settings.buildTimeout &&
|
||||
goal->respectsTimeouts() &&
|
||||
after - j->timeStarted >= std::chrono::seconds(settings.buildTimeout))
|
||||
{
|
||||
handleWorkResult(
|
||||
goal,
|
||||
goal->timedOut(
|
||||
Error("%1% timed out after %2% seconds", goal->getName(), settings.buildTimeout)
|
||||
)
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
std::set<int> fds2(j->fds);
|
||||
std::vector<unsigned char> buffer(4096);
|
||||
for (auto & k : fds2) {
|
||||
const auto fdPollStatusId = get(fdToPollStatus, k);
|
||||
assert(fdPollStatusId);
|
||||
assert(*fdPollStatusId < pollStatus.size());
|
||||
if (pollStatus.at(*fdPollStatusId).revents) {
|
||||
ssize_t rd = ::read(k, buffer.data(), buffer.size());
|
||||
// FIXME: is there a cleaner way to handle pt close
|
||||
// than EIO? Is this even standard?
|
||||
if (rd == 0 || (rd == -1 && errno == EIO)) {
|
||||
debug("%1%: got EOF", goal->getName());
|
||||
goal->handleEOF(k);
|
||||
handleWorkResult(goal, Goal::ContinueImmediately{});
|
||||
j->fds.erase(k);
|
||||
} else if (rd == -1) {
|
||||
if (errno != EINTR)
|
||||
throw SysError("%s: read failed", goal->getName());
|
||||
} else {
|
||||
printMsg(lvlVomit, "%1%: read %2% bytes",
|
||||
goal->getName(), rd);
|
||||
std::string_view data(charptr_cast<char *>(buffer.data()), rd);
|
||||
j->lastOutput = after;
|
||||
handleWorkResult(goal, goal->handleChildOutput(k, data));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!waitingForAWhile.empty() && lastWokenUp + std::chrono::seconds(settings.pollInterval) <= after) {
|
||||
lastWokenUp = after;
|
||||
for (auto & i : waitingForAWhile) {
|
||||
|
|
|
@ -21,24 +21,6 @@ class DrvOutputSubstitutionGoal;
|
|||
|
||||
typedef std::chrono::time_point<std::chrono::steady_clock> steady_time_point;
|
||||
|
||||
/**
|
||||
* A mapping used to remember for each child process to what goal it
|
||||
* belongs, and file descriptors for receiving log data and output
|
||||
* path creation commands.
|
||||
*/
|
||||
struct Child
|
||||
{
|
||||
WeakGoalPtr goal;
|
||||
Goal * goal2; // ugly hackery
|
||||
std::set<int> fds;
|
||||
bool inBuildSlot;
|
||||
/**
|
||||
* Time we last got output on stdout/stderr
|
||||
*/
|
||||
steady_time_point lastOutput;
|
||||
steady_time_point timeStarted;
|
||||
};
|
||||
|
||||
/* Forward definition. */
|
||||
struct HookInstance;
|
||||
|
||||
|
@ -116,11 +98,6 @@ private:
|
|||
*/
|
||||
WeakGoals wantingToBuild;
|
||||
|
||||
/**
|
||||
* Child processes currently running.
|
||||
*/
|
||||
std::list<Child> children;
|
||||
|
||||
/**
|
||||
* Number of build slots occupied. This includes local builds but does not
|
||||
* include substitutions or remote builds via the build hook.
|
||||
|
@ -179,6 +156,8 @@ private:
|
|||
void goalFinished(GoalPtr goal, Goal::Finished & f);
|
||||
void handleWorkResult(GoalPtr goal, Goal::WorkResult how);
|
||||
|
||||
kj::Own<kj::PromiseFulfiller<void>> childFinished;
|
||||
|
||||
/**
|
||||
* Put `goal` to sleep until a build slot becomes available (which
|
||||
* might be right away).
|
||||
|
@ -212,9 +191,14 @@ private:
|
|||
* Registers a running child process. `inBuildSlot` means that
|
||||
* the process counts towards the jobs limit.
|
||||
*/
|
||||
void childStarted(GoalPtr goal, const std::set<int> & fds,
|
||||
void childStarted(GoalPtr goal, kj::Promise<Outcome<void, Goal::Finished>> promise,
|
||||
bool inBuildSlot);
|
||||
|
||||
/**
|
||||
* Unregisters a running child process.
|
||||
*/
|
||||
void childTerminated(GoalPtr goal, bool inBuildSlot);
|
||||
|
||||
/**
|
||||
* Pass current stats counters to the logger for progress bar updates.
|
||||
*/
|
||||
|
@ -240,6 +224,11 @@ public:
|
|||
Store & evalStore;
|
||||
kj::AsyncIoContext & aio;
|
||||
|
||||
private:
|
||||
kj::TaskSet children;
|
||||
std::exception_ptr childException;
|
||||
|
||||
public:
|
||||
struct HookState {
|
||||
std::unique_ptr<HookInstance> instance;
|
||||
|
||||
|
@ -302,11 +291,6 @@ private:
|
|||
GoalPtr makeGoal(const DerivedPath & req, BuildMode buildMode = bmNormal) override;
|
||||
|
||||
public:
|
||||
/**
|
||||
* Unregisters a running child process.
|
||||
*/
|
||||
void childTerminated(Goal * goal);
|
||||
|
||||
/**
|
||||
* Loop until the specified top-level goals have finished.
|
||||
*/
|
||||
|
|
Loading…
Reference in a new issue