diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc index b9696733..a2b69c2b 100644 --- a/src/hydra-queue-runner/build-remote.cc +++ b/src/hydra-queue-runner/build-remote.cc @@ -60,6 +60,7 @@ static void openConnection(const string & sshName, const string & sshKey, static void copyClosureTo(std::shared_ptr store, FdSource & from, FdSink & to, const PathSet & paths, + TokenServer & copyClosureTokenServer, bool useSubstitutes = false) { PathSet closure; @@ -88,6 +89,19 @@ static void copyClosureTo(std::shared_ptr store, for (auto i = sorted.rbegin(); i != sorted.rend(); ++i) if (present.find(*i) == present.end()) missing.push_back(*i); + /* Ensure that only a limited number of threads can copy closures + at the same time. However, proceed anyway after a timeout to + prevent starvation by a handful of really huge closures. */ + time_t start = time(0); + int timeout = 60 * (10 + rand() % 5); + auto token(copyClosureTokenServer.get(timeout)); + time_t stop = time(0); + + if (token()) + printMsg(lvlDebug, format("got copy closure token after %1%s") % (stop - start)); + else + printMsg(lvlDebug, format("dit not get copy closure token after %1%s") % (stop - start)); + printMsg(lvlDebug, format("sending %1% missing paths") % missing.size()); writeInt(cmdImportPaths, to); @@ -114,6 +128,7 @@ void buildRemote(std::shared_ptr store, const string & sshName, const string & sshKey, const Path & drvPath, const Derivation & drv, const nix::Path & logDir, unsigned int maxSilentTime, unsigned int buildTimeout, + TokenServer & copyClosureTokenServer, RemoteResult & result, counter & nrStepsBuilding) { string base = baseNameOf(drvPath); @@ -163,7 +178,7 @@ void buildRemote(std::shared_ptr store, /* Copy the input closure. */ printMsg(lvlDebug, format("sending closure of ‘%1%’ to ‘%2%’") % drvPath % sshName); - copyClosureTo(store, from, to, inputs); + copyClosureTo(store, from, to, inputs, copyClosureTokenServer); autoDelete.cancel(); diff --git a/src/hydra-queue-runner/build-remote.hh b/src/hydra-queue-runner/build-remote.hh index 68d612e3..86ec767f 100644 --- a/src/hydra-queue-runner/build-remote.hh +++ b/src/hydra-queue-runner/build-remote.hh @@ -4,6 +4,7 @@ #include "derivations.hh" #include "counter.hh" +#include "token-server.hh" struct RemoteResult { @@ -22,4 +23,5 @@ void buildRemote(std::shared_ptr store, const std::string & sshName, const std::string & sshKey, const nix::Path & drvPath, const nix::Derivation & drv, const nix::Path & logDir, unsigned int maxSilentTime, unsigned int buildTimeout, + TokenServer & copyClosureTokenServer, RemoteResult & result, counter & nrStepsBuilding); diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index 8a15a96e..524372f6 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -20,6 +20,7 @@ #include "sync.hh" #include "pool.hh" #include "counter.hh" +#include "token-server.hh" #include "store-api.hh" #include "derivations.hh" @@ -31,9 +32,11 @@ using namespace nix; -const int maxTries = 5; -const int retryInterval = 60; // seconds +// FIXME: Make configurable. +const unsigned int maxTries = 5; +const unsigned int retryInterval = 60; // seconds const float retryBackoff = 3.0; +const unsigned int maxParallelCopyClosure = 4; typedef std::chrono::time_point system_time; @@ -243,6 +246,10 @@ private: typedef std::list Machines; Sync machines; + /* Token server limiting the number of threads copying closures in + parallel to prevent excessive I/O load. */ + TokenServer copyClosureTokenServer{maxParallelCopyClosure}; + /* Various stats. */ time_t startedAt; counter nrBuildsRead{0}; @@ -1100,7 +1107,8 @@ bool State::doBuildStep(std::shared_ptr store, Step::ptr step, try { /* FIXME: referring builds may have conflicting timeouts. */ buildRemote(store, machine->sshName, machine->sshKey, step->drvPath, step->drv, - logDir, build->maxSilentTime, build->buildTimeout, result, nrStepsBuilding); + logDir, build->maxSilentTime, build->buildTimeout, copyClosureTokenServer, + result, nrStepsBuilding); } catch (Error & e) { result.status = RemoteResult::rrMiscFailure; result.errorMsg = e.msg(); diff --git a/src/hydra-queue-runner/sync.hh b/src/hydra-queue-runner/sync.hh index 34b97285..aadaa838 100644 --- a/src/hydra-queue-runner/sync.hh +++ b/src/hydra-queue-runner/sync.hh @@ -2,6 +2,7 @@ #include #include +#include /* This template class ensures synchronized access to a value of type T. It is used as follows: @@ -50,6 +51,15 @@ public: assert(s); cv.wait(s->mutex); } + + template + bool wait_for(std::condition_variable_any & cv, + const std::chrono::duration & duration, + Predicate pred) + { + assert(s); + return cv.wait_for(s->mutex, duration, pred); + } }; Lock lock() { return Lock(this); } diff --git a/src/hydra-queue-runner/token-server.hh b/src/hydra-queue-runner/token-server.hh new file mode 100644 index 00000000..2ff748e3 --- /dev/null +++ b/src/hydra-queue-runner/token-server.hh @@ -0,0 +1,67 @@ +#pragma once + +#include + +#include "sync.hh" + +/* This class hands out tokens. There are only ‘maxTokens’ tokens + available. Calling get() will return a Token object, representing + ownership of a token. If no token is available, get() will sleep + until another thread returns a token. */ + +class TokenServer +{ + unsigned int maxTokens; + + Sync curTokens{0}; + std::condition_variable_any wakeup; + +public: + TokenServer(unsigned int maxTokens) : maxTokens(maxTokens) { } + + class Token + { + friend TokenServer; + + TokenServer * ts; + + bool acquired = false; + + Token(TokenServer * ts, unsigned int timeout) : ts(ts) + { + auto curTokens(ts->curTokens.lock()); + while (*curTokens >= ts->maxTokens) + if (timeout) { + if (!curTokens.wait_for(ts->wakeup, std::chrono::seconds(timeout), + [&]() { return *curTokens < ts->maxTokens; })) + return; + } else + curTokens.wait(ts->wakeup); + (*curTokens)++; + acquired = true; + } + + public: + + Token(Token && t) : ts(t.ts) { t.ts = 0; } + Token(const Token & l) = delete; + + ~Token() + { + if (!ts || !acquired) return; + { + auto curTokens(ts->curTokens.lock()); + assert(*curTokens); + (*curTokens)--; + } + ts->wakeup.notify_one(); + } + + bool operator ()() { return acquired; } + }; + + Token get(unsigned int timeout = 0) + { + return Token(this, timeout); + } +};