forked from lix-project/hydra
hydra-queue-runner: More accurate memory accounting
We now take into account the memory necessary for compressing the NAR being exported to the binary cache, plus xz compression overhead. Also, we now release the memory tokens for the NAR accessor *after* releasing the NAR accessor. Previously the memory for the NAR accessor might still be in use while another thread does an allocation, causing the maximum to be exceeded temporarily. Also, use notify_all instead of notify_one to wake up memory token waiters. This is not very nice, but not every waiter is requesting the same number of tokens, so some might be able to proceed.
This commit is contained in:
parent
cb5e438a08
commit
b4d32a3085
4 changed files with 40 additions and 10 deletions
|
@ -373,10 +373,13 @@ void State::buildRemote(ref<Store> destStore,
|
||||||
% step->drvPath % machine->sshName % totalNarSize);
|
% step->drvPath % machine->sshName % totalNarSize);
|
||||||
|
|
||||||
/* Block until we have the required amount of memory
|
/* Block until we have the required amount of memory
|
||||||
available. FIXME: only need this for binary cache
|
available, which is twice the NAR size (namely the
|
||||||
destination stores. */
|
uncompressed and worst-case compressed NAR), plus 150
|
||||||
|
MB for xz compression overhead. (The xz manpage claims
|
||||||
|
~94 MiB, but that's not was I'm seeing.) */
|
||||||
auto resStart = std::chrono::steady_clock::now();
|
auto resStart = std::chrono::steady_clock::now();
|
||||||
auto memoryReservation(memoryTokens.get(totalNarSize));
|
size_t compressionCost = totalNarSize + 150 * 1024 * 1024;
|
||||||
|
result.tokens = std::make_unique<nix::TokenServer::Token>(memoryTokens.get(totalNarSize + compressionCost));
|
||||||
auto resStop = std::chrono::steady_clock::now();
|
auto resStop = std::chrono::steady_clock::now();
|
||||||
|
|
||||||
auto resMs = std::chrono::duration_cast<std::chrono::milliseconds>(resStop - resStart).count();
|
auto resMs = std::chrono::duration_cast<std::chrono::milliseconds>(resStop - resStart).count();
|
||||||
|
@ -390,6 +393,11 @@ void State::buildRemote(ref<Store> destStore,
|
||||||
to.flush();
|
to.flush();
|
||||||
destStore->importPaths(from, result.accessor, true);
|
destStore->importPaths(from, result.accessor, true);
|
||||||
|
|
||||||
|
/* Release the tokens pertaining to NAR
|
||||||
|
compression. After this we only have the uncompressed
|
||||||
|
NAR in memory. */
|
||||||
|
result.tokens->give_back(compressionCost);
|
||||||
|
|
||||||
auto now2 = std::chrono::steady_clock::now();
|
auto now2 = std::chrono::steady_clock::now();
|
||||||
|
|
||||||
result.overhead += std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
|
result.overhead += std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
|
||||||
|
|
|
@ -179,6 +179,9 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
|
||||||
|
|
||||||
if (result.stepStatus == bsSuccess)
|
if (result.stepStatus == bsSuccess)
|
||||||
res = getBuildOutput(destStore, ref<FSAccessor>(result.accessor), step->drv);
|
res = getBuildOutput(destStore, ref<FSAccessor>(result.accessor), step->drv);
|
||||||
|
|
||||||
|
result.accessor = 0;
|
||||||
|
result.tokens = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
time_t stepStopTime = time(0);
|
time_t stepStopTime = time(0);
|
||||||
|
|
|
@ -50,6 +50,7 @@ struct RemoteResult
|
||||||
time_t startTime = 0, stopTime = 0;
|
time_t startTime = 0, stopTime = 0;
|
||||||
unsigned int overhead = 0;
|
unsigned int overhead = 0;
|
||||||
nix::Path logFile;
|
nix::Path logFile;
|
||||||
|
std::unique_ptr<nix::TokenServer::Token> tokens;
|
||||||
std::shared_ptr<nix::FSAccessor> accessor;
|
std::shared_ptr<nix::FSAccessor> accessor;
|
||||||
|
|
||||||
BuildStatus buildStatus()
|
BuildStatus buildStatus()
|
||||||
|
|
|
@ -40,6 +40,7 @@ public:
|
||||||
{
|
{
|
||||||
if (tokens >= ts->maxTokens)
|
if (tokens >= ts->maxTokens)
|
||||||
throw NoTokens(format("requesting more tokens (%d) than exist (%d)") % tokens % ts->maxTokens);
|
throw NoTokens(format("requesting more tokens (%d) than exist (%d)") % tokens % ts->maxTokens);
|
||||||
|
debug("acquiring %d tokens", tokens);
|
||||||
auto inUse(ts->inUse.lock());
|
auto inUse(ts->inUse.lock());
|
||||||
while (*inUse + tokens > ts->maxTokens)
|
while (*inUse + tokens > ts->maxTokens)
|
||||||
if (timeout) {
|
if (timeout) {
|
||||||
|
@ -54,21 +55,38 @@ public:
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
|
||||||
Token(Token && t) : ts(t.ts) { t.ts = 0; }
|
Token(Token && t) : ts(t.ts), tokens(t.tokens), acquired(t.acquired)
|
||||||
|
{
|
||||||
|
t.ts = 0;
|
||||||
|
t.acquired = false;
|
||||||
|
}
|
||||||
Token(const Token & l) = delete;
|
Token(const Token & l) = delete;
|
||||||
|
|
||||||
~Token()
|
~Token()
|
||||||
{
|
{
|
||||||
if (!ts || !acquired) return;
|
if (!ts || !acquired) return;
|
||||||
{
|
give_back(tokens);
|
||||||
auto inUse(ts->inUse.lock());
|
|
||||||
assert(*inUse >= tokens);
|
|
||||||
*inUse -= tokens;
|
|
||||||
}
|
|
||||||
ts->wakeup.notify_one();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool operator ()() { return acquired; }
|
bool operator ()() { return acquired; }
|
||||||
|
|
||||||
|
void give_back(size_t t)
|
||||||
|
{
|
||||||
|
debug("returning %d tokens", t);
|
||||||
|
if (!t) return;
|
||||||
|
assert(acquired);
|
||||||
|
assert(t <= tokens);
|
||||||
|
{
|
||||||
|
auto inUse(ts->inUse.lock());
|
||||||
|
assert(*inUse >= t);
|
||||||
|
*inUse -= t;
|
||||||
|
tokens -= t;
|
||||||
|
}
|
||||||
|
// FIXME: inefficient. Should wake up waiters that can
|
||||||
|
// proceed now.
|
||||||
|
ts->wakeup.notify_all();
|
||||||
|
}
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
Token get(size_t tokens = 1, unsigned int timeout = 0)
|
Token get(size_t tokens = 1, unsigned int timeout = 0)
|
||||||
|
|
Loading…
Reference in a new issue