hydra-queue-runner: Limit memory usage

When using a binary cache store, the queue runner receives NARs from
the build machines, compresses them, and uploads them to the
cache. However, keeping multiple large NARs in memory can cause the
queue runner to run out of memory. This can happen for instance when
it's processing multiple ISO images concurrently.

The fix is to use a TokenServer to prevent the builder threads to
store more than a certain total size of NARs concurrently (at the
moment, this is hard-coded at 4 GiB). Builder threads that cause the
limit to be exceeded will block until other threads have finished.

The 4 GiB limit does not include certain other allocations, such as
for xz compression or for FSAccessor::readFile(). But since these are
unlikely to be more than the size of the NARs and hydra.nixos.org has
32 GiB RAM, it should be fine.
This commit is contained in:
Eelco Dolstra 2016-03-09 14:30:13 +01:00
parent 49a4639377
commit 9127f5bbc3
4 changed files with 84 additions and 27 deletions

View file

@ -283,16 +283,43 @@ void State::buildRemote(ref<Store> destStore,
/* Copy the output paths. */ /* Copy the output paths. */
if (/* machine->sshName != "localhost" */ true) { if (/* machine->sshName != "localhost" */ true) {
printMsg(lvlDebug, format("copying outputs of %1% from %2%") % step->drvPath % machine->sshName); MaintainCount mc(nrStepsCopyingFrom);
auto now1 = std::chrono::steady_clock::now();
PathSet outputs; PathSet outputs;
for (auto & output : step->drv.outputs) for (auto & output : step->drv.outputs)
outputs.insert(output.second.path); outputs.insert(output.second.path);
MaintainCount mc(nrStepsCopyingFrom);
/* Query the size of the output paths. */
size_t totalNarSize = 0;
to << cmdQueryPathInfos << outputs;
to.flush();
while (true) {
if (readString(from) == "") break;
readString(from); // deriver
readStrings<PathSet>(from); // references
readLongLong(from); // download size
totalNarSize += readLongLong(from);
}
printMsg(lvlDebug, format("copying outputs of %s from %s (%d bytes)")
% step->drvPath % machine->sshName % totalNarSize);
/* Block until we have the required amount of memory
available. FIXME: only need this for binary cache
destination stores. */
auto resStart = std::chrono::steady_clock::now();
auto memoryReservation(memoryTokens.get(totalNarSize));
auto resStop = std::chrono::steady_clock::now();
auto resMs = std::chrono::duration_cast<std::chrono::milliseconds>(resStop - resStart).count();
if (resMs >= 1000)
printMsg(lvlError, format("warning: had to wait %d ms for %d memory tokens for %s")
% resMs % totalNarSize % step->drvPath);
result.accessor = destStore->getFSAccessor(); result.accessor = destStore->getFSAccessor();
auto now1 = std::chrono::steady_clock::now();
to << cmdExportPaths << 0 << outputs; to << cmdExportPaths << 0 << outputs;
to.flush(); to.flush();
destStore->importPaths(false, from, result.accessor); destStore->importPaths(false, from, result.accessor);

View file

@ -17,6 +17,7 @@ using namespace nix;
State::State() State::State()
: memoryTokens(4ULL << 30) // FIXME: make this configurable
{ {
hydraData = getEnv("HYDRA_DATA"); hydraData = getEnv("HYDRA_DATA");
if (hydraData == "") throw Error("$HYDRA_DATA must be set"); if (hydraData == "") throw Error("$HYDRA_DATA must be set");
@ -567,6 +568,8 @@ void State::dumpStatus(Connection & conn, bool log)
root.attr("dispatchTimeAvgMs", nrDispatcherWakeups == 0 ? 0.0 : (float) dispatchTimeMs / nrDispatcherWakeups); root.attr("dispatchTimeAvgMs", nrDispatcherWakeups == 0 ? 0.0 : (float) dispatchTimeMs / nrDispatcherWakeups);
root.attr("nrDbConnections", dbPool.count()); root.attr("nrDbConnections", dbPool.count());
root.attr("nrActiveDbUpdates", nrActiveDbUpdates); root.attr("nrActiveDbUpdates", nrActiveDbUpdates);
root.attr("memoryTokensInUse", memoryTokens.currentUse());
{ {
root.attr("machines"); root.attr("machines");
JSONObject nested(out); JSONObject nested(out);
@ -589,6 +592,7 @@ void State::dumpStatus(Connection & conn, bool log)
} }
} }
} }
{ {
root.attr("jobsets"); root.attr("jobsets");
JSONObject nested(out); JSONObject nested(out);
@ -600,6 +604,7 @@ void State::dumpStatus(Connection & conn, bool log)
nested2.attr("seconds", jobset.second->getSeconds()); nested2.attr("seconds", jobset.second->getSeconds());
} }
} }
{ {
root.attr("machineTypes"); root.attr("machineTypes");
JSONObject nested(out); JSONObject nested(out);

View file

@ -9,14 +9,13 @@
#include "db.hh" #include "db.hh"
#include "counter.hh" #include "counter.hh"
#include "token-server.hh"
#include "derivations.hh"
#include "pathlocks.hh" #include "pathlocks.hh"
#include "pool.hh" #include "pool.hh"
#include "sync.hh"
#include "store-api.hh" #include "store-api.hh"
#include "derivations.hh" #include "sync.hh"
#include "binary-cache-store.hh" // FIXME
typedef unsigned int BuildID; typedef unsigned int BuildID;
@ -354,6 +353,13 @@ private:
std::shared_ptr<nix::Store> _localStore; std::shared_ptr<nix::Store> _localStore;
std::shared_ptr<nix::Store> _destStore; std::shared_ptr<nix::Store> _destStore;
/* Token server to prevent threads from allocating too many big
strings concurrently while importing NARs from the build
machines. When a thread imports a NAR of size N, it will first
acquire N memory tokens, causing it to block until that many
tokens are available. */
nix::TokenServer memoryTokens;
public: public:
State(); State();

View file

@ -3,21 +3,27 @@
#include <atomic> #include <atomic>
#include "sync.hh" #include "sync.hh"
#include "types.hh"
namespace nix {
MakeError(NoTokens, Error)
/* This class hands out tokens. There are only maxTokens tokens /* This class hands out tokens. There are only maxTokens tokens
available. Calling get() will return a Token object, representing available. Calling get(N) will return a Token object, representing
ownership of a token. If no token is available, get() will sleep ownership of N tokens. If the requested number of tokens is
until another thread returns a token. */ unavailable, get() will sleep until another thread returns a
token. */
class TokenServer class TokenServer
{ {
unsigned int maxTokens; const size_t maxTokens;
Sync<unsigned int> curTokens{0}; Sync<size_t> inUse{0};
std::condition_variable wakeup; std::condition_variable wakeup;
public: public:
TokenServer(unsigned int maxTokens) : maxTokens(maxTokens) { } TokenServer(size_t maxTokens) : maxTokens(maxTokens) { }
class Token class Token
{ {
@ -25,19 +31,24 @@ public:
TokenServer * ts; TokenServer * ts;
size_t tokens;
bool acquired = false; bool acquired = false;
Token(TokenServer * ts, unsigned int timeout) : ts(ts) Token(TokenServer * ts, size_t tokens, unsigned int timeout)
: ts(ts), tokens(tokens)
{ {
auto curTokens(ts->curTokens.lock()); if (tokens >= ts->maxTokens)
while (*curTokens >= ts->maxTokens) throw NoTokens(format("requesting more tokens (%d) than exist (%d)") % tokens);
auto inUse(ts->inUse.lock());
while (*inUse + tokens > ts->maxTokens)
if (timeout) { if (timeout) {
if (!curTokens.wait_for(ts->wakeup, std::chrono::seconds(timeout), if (!inUse.wait_for(ts->wakeup, std::chrono::seconds(timeout),
[&]() { return *curTokens < ts->maxTokens; })) [&]() { return *inUse + tokens <= ts->maxTokens; }))
return; return;
} else } else
curTokens.wait(ts->wakeup); inUse.wait(ts->wakeup);
(*curTokens)++; *inUse += tokens;
acquired = true; acquired = true;
} }
@ -50,9 +61,9 @@ public:
{ {
if (!ts || !acquired) return; if (!ts || !acquired) return;
{ {
auto curTokens(ts->curTokens.lock()); auto inUse(ts->inUse.lock());
assert(*curTokens); assert(*inUse >= tokens);
(*curTokens)--; *inUse -= tokens;
} }
ts->wakeup.notify_one(); ts->wakeup.notify_one();
} }
@ -60,8 +71,16 @@ public:
bool operator ()() { return acquired; } bool operator ()() { return acquired; }
}; };
Token get(unsigned int timeout = 0) Token get(size_t tokens = 1, unsigned int timeout = 0)
{ {
return Token(this, timeout); return Token(this, tokens, timeout);
}
size_t currentUse()
{
auto inUse_(inUse.lock());
return *inUse_;
} }
}; };
}