From 9127f5bbc31e3f8c156f06fee26e1e0373af2900 Mon Sep 17 00:00:00 2001 From: Eelco Dolstra Date: Wed, 9 Mar 2016 14:30:13 +0100 Subject: [PATCH] hydra-queue-runner: Limit memory usage When using a binary cache store, the queue runner receives NARs from the build machines, compresses them, and uploads them to the cache. However, keeping multiple large NARs in memory can cause the queue runner to run out of memory. This can happen for instance when it's processing multiple ISO images concurrently. The fix is to use a TokenServer to prevent the builder threads to store more than a certain total size of NARs concurrently (at the moment, this is hard-coded at 4 GiB). Builder threads that cause the limit to be exceeded will block until other threads have finished. The 4 GiB limit does not include certain other allocations, such as for xz compression or for FSAccessor::readFile(). But since these are unlikely to be more than the size of the NARs and hydra.nixos.org has 32 GiB RAM, it should be fine. --- src/hydra-queue-runner/build-remote.cc | 35 +++++++++++-- src/hydra-queue-runner/hydra-queue-runner.cc | 5 ++ src/hydra-queue-runner/state.hh | 16 ++++-- src/hydra-queue-runner/token-server.hh | 55 +++++++++++++------- 4 files changed, 84 insertions(+), 27 deletions(-) diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc index ef3ca2e3..3c9a8f79 100644 --- a/src/hydra-queue-runner/build-remote.cc +++ b/src/hydra-queue-runner/build-remote.cc @@ -283,16 +283,43 @@ void State::buildRemote(ref destStore, /* Copy the output paths. */ if (/* machine->sshName != "localhost" */ true) { - printMsg(lvlDebug, format("copying outputs of ‘%1%’ from ‘%2%’") % step->drvPath % machine->sshName); + MaintainCount mc(nrStepsCopyingFrom); + + auto now1 = std::chrono::steady_clock::now(); + PathSet outputs; for (auto & output : step->drv.outputs) outputs.insert(output.second.path); - MaintainCount mc(nrStepsCopyingFrom); + + /* Query the size of the output paths. */ + size_t totalNarSize = 0; + to << cmdQueryPathInfos << outputs; + to.flush(); + while (true) { + if (readString(from) == "") break; + readString(from); // deriver + readStrings(from); // references + readLongLong(from); // download size + totalNarSize += readLongLong(from); + } + + printMsg(lvlDebug, format("copying outputs of ‘%s’ from ‘%s’ (%d bytes)") + % step->drvPath % machine->sshName % totalNarSize); + + /* Block until we have the required amount of memory + available. FIXME: only need this for binary cache + destination stores. */ + auto resStart = std::chrono::steady_clock::now(); + auto memoryReservation(memoryTokens.get(totalNarSize)); + auto resStop = std::chrono::steady_clock::now(); + + auto resMs = std::chrono::duration_cast(resStop - resStart).count(); + if (resMs >= 1000) + printMsg(lvlError, format("warning: had to wait %d ms for %d memory tokens for %s") + % resMs % totalNarSize % step->drvPath); result.accessor = destStore->getFSAccessor(); - auto now1 = std::chrono::steady_clock::now(); - to << cmdExportPaths << 0 << outputs; to.flush(); destStore->importPaths(false, from, result.accessor); diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index a4b4595b..2a21e647 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -17,6 +17,7 @@ using namespace nix; State::State() + : memoryTokens(4ULL << 30) // FIXME: make this configurable { hydraData = getEnv("HYDRA_DATA"); if (hydraData == "") throw Error("$HYDRA_DATA must be set"); @@ -567,6 +568,8 @@ void State::dumpStatus(Connection & conn, bool log) root.attr("dispatchTimeAvgMs", nrDispatcherWakeups == 0 ? 0.0 : (float) dispatchTimeMs / nrDispatcherWakeups); root.attr("nrDbConnections", dbPool.count()); root.attr("nrActiveDbUpdates", nrActiveDbUpdates); + root.attr("memoryTokensInUse", memoryTokens.currentUse()); + { root.attr("machines"); JSONObject nested(out); @@ -589,6 +592,7 @@ void State::dumpStatus(Connection & conn, bool log) } } } + { root.attr("jobsets"); JSONObject nested(out); @@ -600,6 +604,7 @@ void State::dumpStatus(Connection & conn, bool log) nested2.attr("seconds", jobset.second->getSeconds()); } } + { root.attr("machineTypes"); JSONObject nested(out); diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index d6ad1ccb..ab70b52f 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -9,14 +9,13 @@ #include "db.hh" #include "counter.hh" +#include "token-server.hh" + +#include "derivations.hh" #include "pathlocks.hh" #include "pool.hh" -#include "sync.hh" - #include "store-api.hh" -#include "derivations.hh" - -#include "binary-cache-store.hh" // FIXME +#include "sync.hh" typedef unsigned int BuildID; @@ -354,6 +353,13 @@ private: std::shared_ptr _localStore; std::shared_ptr _destStore; + /* Token server to prevent threads from allocating too many big + strings concurrently while importing NARs from the build + machines. When a thread imports a NAR of size N, it will first + acquire N memory tokens, causing it to block until that many + tokens are available. */ + nix::TokenServer memoryTokens; + public: State(); diff --git a/src/hydra-queue-runner/token-server.hh b/src/hydra-queue-runner/token-server.hh index d4f5f843..cdd03b5e 100644 --- a/src/hydra-queue-runner/token-server.hh +++ b/src/hydra-queue-runner/token-server.hh @@ -3,21 +3,27 @@ #include #include "sync.hh" +#include "types.hh" + +namespace nix { + +MakeError(NoTokens, Error) /* This class hands out tokens. There are only ‘maxTokens’ tokens - available. Calling get() will return a Token object, representing - ownership of a token. If no token is available, get() will sleep - until another thread returns a token. */ + available. Calling get(N) will return a Token object, representing + ownership of N tokens. If the requested number of tokens is + unavailable, get() will sleep until another thread returns a + token. */ class TokenServer { - unsigned int maxTokens; + const size_t maxTokens; - Sync curTokens{0}; + Sync inUse{0}; std::condition_variable wakeup; public: - TokenServer(unsigned int maxTokens) : maxTokens(maxTokens) { } + TokenServer(size_t maxTokens) : maxTokens(maxTokens) { } class Token { @@ -25,19 +31,24 @@ public: TokenServer * ts; + size_t tokens; + bool acquired = false; - Token(TokenServer * ts, unsigned int timeout) : ts(ts) + Token(TokenServer * ts, size_t tokens, unsigned int timeout) + : ts(ts), tokens(tokens) { - auto curTokens(ts->curTokens.lock()); - while (*curTokens >= ts->maxTokens) + if (tokens >= ts->maxTokens) + throw NoTokens(format("requesting more tokens (%d) than exist (%d)") % tokens); + auto inUse(ts->inUse.lock()); + while (*inUse + tokens > ts->maxTokens) if (timeout) { - if (!curTokens.wait_for(ts->wakeup, std::chrono::seconds(timeout), - [&]() { return *curTokens < ts->maxTokens; })) + if (!inUse.wait_for(ts->wakeup, std::chrono::seconds(timeout), + [&]() { return *inUse + tokens <= ts->maxTokens; })) return; } else - curTokens.wait(ts->wakeup); - (*curTokens)++; + inUse.wait(ts->wakeup); + *inUse += tokens; acquired = true; } @@ -50,9 +61,9 @@ public: { if (!ts || !acquired) return; { - auto curTokens(ts->curTokens.lock()); - assert(*curTokens); - (*curTokens)--; + auto inUse(ts->inUse.lock()); + assert(*inUse >= tokens); + *inUse -= tokens; } ts->wakeup.notify_one(); } @@ -60,8 +71,16 @@ public: bool operator ()() { return acquired; } }; - Token get(unsigned int timeout = 0) + Token get(size_t tokens = 1, unsigned int timeout = 0) { - return Token(this, timeout); + return Token(this, tokens, timeout); + } + + size_t currentUse() + { + auto inUse_(inUse.lock()); + return *inUse_; } }; + +}