PathSubstitutionGoal: Clean up pipe

If there were many top-level goals (which are not destroyed until the
very end), commands like

  $ nix copy --to 'ssh://localhost?remote-store=/tmp/nix' \
    /run/current-system --no-check-sigs --substitute-on-destination

could fail with "Too many open files". So now we do some explicit
cleanup from amDone(). It would be cleaner to separate goals from
their temporary internal state, but that would be a bigger refactor.
This commit is contained in:
Eelco Dolstra 2021-04-07 12:21:31 +02:00
parent 4bf3eb27e6
commit 8a29052cb2
7 changed files with 43 additions and 17 deletions

View file

@ -78,6 +78,8 @@ void Goal::amDone(ExitCode result, std::optional<Error> ex)
}
waiters.clear();
worker.removeGoal(shared_from_this());
cleanup();
}

View file

@ -100,6 +100,8 @@ struct Goal : public std::enable_shared_from_this<Goal>
virtual string key() = 0;
void amDone(ExitCode result, std::optional<Error> ex = {});
virtual void cleanup() { }
};
void addToWeakGoals(WeakGoals & goals, GoalPtr p);

View file

@ -20,15 +20,7 @@ PathSubstitutionGoal::PathSubstitutionGoal(const StorePath & storePath, Worker &
PathSubstitutionGoal::~PathSubstitutionGoal()
{
try {
if (thr.joinable()) {
// FIXME: signal worker thread to quit.
thr.join();
worker.childTerminated(this);
}
} catch (...) {
ignoreException();
}
cleanup();
}
@ -63,6 +55,8 @@ void PathSubstitutionGoal::tryNext()
{
trace("trying next substituter");
cleanup();
if (subs.size() == 0) {
/* None left. Terminate this goal and let someone else deal
with it. */
@ -205,7 +199,7 @@ void PathSubstitutionGoal::tryToRun()
thr = std::thread([this]() {
try {
/* Wake up the worker loop when we're done. */
Finally updateStats([this]() { outPipe.writeSide = -1; });
Finally updateStats([this]() { outPipe.writeSide.close(); });
Activity act(*logger, actSubstitute, Logger::Fields{worker.store.printStorePath(storePath), sub->getUri()});
PushActivity pact(act.id);
@ -288,4 +282,21 @@ void PathSubstitutionGoal::handleEOF(int fd)
if (fd == outPipe.readSide.get()) worker.wakeUp(shared_from_this());
}
void PathSubstitutionGoal::cleanup()
{
try {
if (thr.joinable()) {
// FIXME: signal worker thread to quit.
thr.join();
worker.childTerminated(this);
}
outPipe.close();
} catch (...) {
ignoreException();
}
}
}

View file

@ -14,7 +14,7 @@ struct PathSubstitutionGoal : public Goal
StorePath storePath;
/* The path the substituter refers to the path as. This will be
* different when the stores have different names. */
different when the stores have different names. */
std::optional<StorePath> subPath;
/* The remaining substituters. */
@ -79,6 +79,8 @@ public:
/* Callback used by the worker to write to the log. */
void handleChildOutput(int fd, const string & data) override;
void handleEOF(int fd) override;
void cleanup() override;
};
}

View file

@ -128,6 +128,7 @@ void Worker::removeGoal(GoalPtr goal)
nix::removeGoal(subGoal, drvOutputSubstitutionGoals);
else
assert(false);
if (topGoals.find(goal) != topGoals.end()) {
topGoals.erase(goal);
/* If a top-level goal failed, then kill all other goals

View file

@ -752,13 +752,13 @@ AutoCloseFD::AutoCloseFD() : fd{-1} {}
AutoCloseFD::AutoCloseFD(int fd) : fd{fd} {}
AutoCloseFD::AutoCloseFD(AutoCloseFD&& that) : fd{that.fd}
AutoCloseFD::AutoCloseFD(AutoCloseFD && that) : fd{that.fd}
{
that.fd = -1;
}
AutoCloseFD& AutoCloseFD::operator =(AutoCloseFD&& that)
AutoCloseFD & AutoCloseFD::operator =(AutoCloseFD && that)
{
close();
fd = that.fd;
@ -789,6 +789,7 @@ void AutoCloseFD::close()
if (::close(fd) == -1)
/* This should never happen. */
throw SysError("closing file descriptor %1%", fd);
fd = -1;
}
}
@ -822,6 +823,12 @@ void Pipe::create()
}
void Pipe::close()
{
readSide.close();
writeSide.close();
}
//////////////////////////////////////////////////////////////////////
@ -1121,7 +1128,7 @@ void runProgram2(const RunOptions & options)
throw SysError("executing '%1%'", options.program);
}, processOptions);
out.writeSide = -1;
out.writeSide.close();
std::thread writerThread;
@ -1134,7 +1141,7 @@ void runProgram2(const RunOptions & options)
if (source) {
in.readSide = -1;
in.readSide.close();
writerThread = std::thread([&]() {
try {
std::vector<char> buf(8 * 1024);
@ -1151,7 +1158,7 @@ void runProgram2(const RunOptions & options)
} catch (...) {
promise.set_exception(std::current_exception());
}
in.writeSide = -1;
in.writeSide.close();
});
}

View file

@ -188,7 +188,6 @@ public:
class AutoCloseFD
{
int fd;
void close();
public:
AutoCloseFD();
AutoCloseFD(int fd);
@ -200,6 +199,7 @@ public:
int get() const;
explicit operator bool() const;
int release();
void close();
};
@ -216,6 +216,7 @@ class Pipe
public:
AutoCloseFD readSide, writeSide;
void create();
void close();
};