forked from lix-project/hydra
Rate-limit the number of threads copying closures at the same time
Having a hundred threads doing I/O at the same time is bad on magnetic disks because of the excessive disk seeks. So allow only 4 threads to copy closures in parallel.
This commit is contained in:
parent
a317d24b29
commit
4db7c51b5c
5 changed files with 106 additions and 4 deletions
|
@ -60,6 +60,7 @@ static void openConnection(const string & sshName, const string & sshKey,
|
||||||
|
|
||||||
static void copyClosureTo(std::shared_ptr<StoreAPI> store,
|
static void copyClosureTo(std::shared_ptr<StoreAPI> store,
|
||||||
FdSource & from, FdSink & to, const PathSet & paths,
|
FdSource & from, FdSink & to, const PathSet & paths,
|
||||||
|
TokenServer & copyClosureTokenServer,
|
||||||
bool useSubstitutes = false)
|
bool useSubstitutes = false)
|
||||||
{
|
{
|
||||||
PathSet closure;
|
PathSet closure;
|
||||||
|
@ -88,6 +89,19 @@ static void copyClosureTo(std::shared_ptr<StoreAPI> store,
|
||||||
for (auto i = sorted.rbegin(); i != sorted.rend(); ++i)
|
for (auto i = sorted.rbegin(); i != sorted.rend(); ++i)
|
||||||
if (present.find(*i) == present.end()) missing.push_back(*i);
|
if (present.find(*i) == present.end()) missing.push_back(*i);
|
||||||
|
|
||||||
|
/* Ensure that only a limited number of threads can copy closures
|
||||||
|
at the same time. However, proceed anyway after a timeout to
|
||||||
|
prevent starvation by a handful of really huge closures. */
|
||||||
|
time_t start = time(0);
|
||||||
|
int timeout = 60 * (10 + rand() % 5);
|
||||||
|
auto token(copyClosureTokenServer.get(timeout));
|
||||||
|
time_t stop = time(0);
|
||||||
|
|
||||||
|
if (token())
|
||||||
|
printMsg(lvlDebug, format("got copy closure token after %1%s") % (stop - start));
|
||||||
|
else
|
||||||
|
printMsg(lvlDebug, format("dit not get copy closure token after %1%s") % (stop - start));
|
||||||
|
|
||||||
printMsg(lvlDebug, format("sending %1% missing paths") % missing.size());
|
printMsg(lvlDebug, format("sending %1% missing paths") % missing.size());
|
||||||
|
|
||||||
writeInt(cmdImportPaths, to);
|
writeInt(cmdImportPaths, to);
|
||||||
|
@ -114,6 +128,7 @@ void buildRemote(std::shared_ptr<StoreAPI> store,
|
||||||
const string & sshName, const string & sshKey,
|
const string & sshName, const string & sshKey,
|
||||||
const Path & drvPath, const Derivation & drv,
|
const Path & drvPath, const Derivation & drv,
|
||||||
const nix::Path & logDir, unsigned int maxSilentTime, unsigned int buildTimeout,
|
const nix::Path & logDir, unsigned int maxSilentTime, unsigned int buildTimeout,
|
||||||
|
TokenServer & copyClosureTokenServer,
|
||||||
RemoteResult & result, counter & nrStepsBuilding)
|
RemoteResult & result, counter & nrStepsBuilding)
|
||||||
{
|
{
|
||||||
string base = baseNameOf(drvPath);
|
string base = baseNameOf(drvPath);
|
||||||
|
@ -163,7 +178,7 @@ void buildRemote(std::shared_ptr<StoreAPI> store,
|
||||||
|
|
||||||
/* Copy the input closure. */
|
/* Copy the input closure. */
|
||||||
printMsg(lvlDebug, format("sending closure of ‘%1%’ to ‘%2%’") % drvPath % sshName);
|
printMsg(lvlDebug, format("sending closure of ‘%1%’ to ‘%2%’") % drvPath % sshName);
|
||||||
copyClosureTo(store, from, to, inputs);
|
copyClosureTo(store, from, to, inputs, copyClosureTokenServer);
|
||||||
|
|
||||||
autoDelete.cancel();
|
autoDelete.cancel();
|
||||||
|
|
||||||
|
|
|
@ -4,6 +4,7 @@
|
||||||
#include "derivations.hh"
|
#include "derivations.hh"
|
||||||
|
|
||||||
#include "counter.hh"
|
#include "counter.hh"
|
||||||
|
#include "token-server.hh"
|
||||||
|
|
||||||
struct RemoteResult
|
struct RemoteResult
|
||||||
{
|
{
|
||||||
|
@ -22,4 +23,5 @@ void buildRemote(std::shared_ptr<nix::StoreAPI> store,
|
||||||
const std::string & sshName, const std::string & sshKey,
|
const std::string & sshName, const std::string & sshKey,
|
||||||
const nix::Path & drvPath, const nix::Derivation & drv,
|
const nix::Path & drvPath, const nix::Derivation & drv,
|
||||||
const nix::Path & logDir, unsigned int maxSilentTime, unsigned int buildTimeout,
|
const nix::Path & logDir, unsigned int maxSilentTime, unsigned int buildTimeout,
|
||||||
|
TokenServer & copyClosureTokenServer,
|
||||||
RemoteResult & result, counter & nrStepsBuilding);
|
RemoteResult & result, counter & nrStepsBuilding);
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
#include "sync.hh"
|
#include "sync.hh"
|
||||||
#include "pool.hh"
|
#include "pool.hh"
|
||||||
#include "counter.hh"
|
#include "counter.hh"
|
||||||
|
#include "token-server.hh"
|
||||||
|
|
||||||
#include "store-api.hh"
|
#include "store-api.hh"
|
||||||
#include "derivations.hh"
|
#include "derivations.hh"
|
||||||
|
@ -31,9 +32,11 @@
|
||||||
using namespace nix;
|
using namespace nix;
|
||||||
|
|
||||||
|
|
||||||
const int maxTries = 5;
|
// FIXME: Make configurable.
|
||||||
const int retryInterval = 60; // seconds
|
const unsigned int maxTries = 5;
|
||||||
|
const unsigned int retryInterval = 60; // seconds
|
||||||
const float retryBackoff = 3.0;
|
const float retryBackoff = 3.0;
|
||||||
|
const unsigned int maxParallelCopyClosure = 4;
|
||||||
|
|
||||||
|
|
||||||
typedef std::chrono::time_point<std::chrono::system_clock> system_time;
|
typedef std::chrono::time_point<std::chrono::system_clock> system_time;
|
||||||
|
@ -243,6 +246,10 @@ private:
|
||||||
typedef std::list<Machine::ptr> Machines;
|
typedef std::list<Machine::ptr> Machines;
|
||||||
Sync<Machines> machines;
|
Sync<Machines> machines;
|
||||||
|
|
||||||
|
/* Token server limiting the number of threads copying closures in
|
||||||
|
parallel to prevent excessive I/O load. */
|
||||||
|
TokenServer copyClosureTokenServer{maxParallelCopyClosure};
|
||||||
|
|
||||||
/* Various stats. */
|
/* Various stats. */
|
||||||
time_t startedAt;
|
time_t startedAt;
|
||||||
counter nrBuildsRead{0};
|
counter nrBuildsRead{0};
|
||||||
|
@ -1100,7 +1107,8 @@ bool State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,
|
||||||
try {
|
try {
|
||||||
/* FIXME: referring builds may have conflicting timeouts. */
|
/* FIXME: referring builds may have conflicting timeouts. */
|
||||||
buildRemote(store, machine->sshName, machine->sshKey, step->drvPath, step->drv,
|
buildRemote(store, machine->sshName, machine->sshKey, step->drvPath, step->drv,
|
||||||
logDir, build->maxSilentTime, build->buildTimeout, result, nrStepsBuilding);
|
logDir, build->maxSilentTime, build->buildTimeout, copyClosureTokenServer,
|
||||||
|
result, nrStepsBuilding);
|
||||||
} catch (Error & e) {
|
} catch (Error & e) {
|
||||||
result.status = RemoteResult::rrMiscFailure;
|
result.status = RemoteResult::rrMiscFailure;
|
||||||
result.errorMsg = e.msg();
|
result.errorMsg = e.msg();
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
|
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include <condition_variable>
|
#include <condition_variable>
|
||||||
|
#include <cassert>
|
||||||
|
|
||||||
/* This template class ensures synchronized access to a value of type
|
/* This template class ensures synchronized access to a value of type
|
||||||
T. It is used as follows:
|
T. It is used as follows:
|
||||||
|
@ -50,6 +51,15 @@ public:
|
||||||
assert(s);
|
assert(s);
|
||||||
cv.wait(s->mutex);
|
cv.wait(s->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template<class Rep, class Period, class Predicate>
|
||||||
|
bool wait_for(std::condition_variable_any & cv,
|
||||||
|
const std::chrono::duration<Rep, Period> & duration,
|
||||||
|
Predicate pred)
|
||||||
|
{
|
||||||
|
assert(s);
|
||||||
|
return cv.wait_for(s->mutex, duration, pred);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
Lock lock() { return Lock(this); }
|
Lock lock() { return Lock(this); }
|
||||||
|
|
67
src/hydra-queue-runner/token-server.hh
Normal file
67
src/hydra-queue-runner/token-server.hh
Normal file
|
@ -0,0 +1,67 @@
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
|
|
||||||
|
#include "sync.hh"
|
||||||
|
|
||||||
|
/* This class hands out tokens. There are only ‘maxTokens’ tokens
|
||||||
|
available. Calling get() will return a Token object, representing
|
||||||
|
ownership of a token. If no token is available, get() will sleep
|
||||||
|
until another thread returns a token. */
|
||||||
|
|
||||||
|
class TokenServer
|
||||||
|
{
|
||||||
|
unsigned int maxTokens;
|
||||||
|
|
||||||
|
Sync<unsigned int> curTokens{0};
|
||||||
|
std::condition_variable_any wakeup;
|
||||||
|
|
||||||
|
public:
|
||||||
|
TokenServer(unsigned int maxTokens) : maxTokens(maxTokens) { }
|
||||||
|
|
||||||
|
class Token
|
||||||
|
{
|
||||||
|
friend TokenServer;
|
||||||
|
|
||||||
|
TokenServer * ts;
|
||||||
|
|
||||||
|
bool acquired = false;
|
||||||
|
|
||||||
|
Token(TokenServer * ts, unsigned int timeout) : ts(ts)
|
||||||
|
{
|
||||||
|
auto curTokens(ts->curTokens.lock());
|
||||||
|
while (*curTokens >= ts->maxTokens)
|
||||||
|
if (timeout) {
|
||||||
|
if (!curTokens.wait_for(ts->wakeup, std::chrono::seconds(timeout),
|
||||||
|
[&]() { return *curTokens < ts->maxTokens; }))
|
||||||
|
return;
|
||||||
|
} else
|
||||||
|
curTokens.wait(ts->wakeup);
|
||||||
|
(*curTokens)++;
|
||||||
|
acquired = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public:
|
||||||
|
|
||||||
|
Token(Token && t) : ts(t.ts) { t.ts = 0; }
|
||||||
|
Token(const Token & l) = delete;
|
||||||
|
|
||||||
|
~Token()
|
||||||
|
{
|
||||||
|
if (!ts || !acquired) return;
|
||||||
|
{
|
||||||
|
auto curTokens(ts->curTokens.lock());
|
||||||
|
assert(*curTokens);
|
||||||
|
(*curTokens)--;
|
||||||
|
}
|
||||||
|
ts->wakeup.notify_one();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool operator ()() { return acquired; }
|
||||||
|
};
|
||||||
|
|
||||||
|
Token get(unsigned int timeout = 0)
|
||||||
|
{
|
||||||
|
return Token(this, timeout);
|
||||||
|
}
|
||||||
|
};
|
Loading…
Reference in a new issue