Remove TokenServer in preparation of making NAR copying O(1) memory

This commit is contained in:
Eelco Dolstra 2020-07-27 14:57:16 +02:00
parent e5f6fc2e4e
commit cbcf6359b4
No known key found for this signature in database
GPG key ID: 8170B4726D7198DE
6 changed files with 1 additions and 146 deletions

View file

@ -2,6 +2,6 @@ bin_PROGRAMS = hydra-queue-runner
hydra_queue_runner_SOURCES = hydra-queue-runner.cc queue-monitor.cc dispatcher.cc \ hydra_queue_runner_SOURCES = hydra-queue-runner.cc queue-monitor.cc dispatcher.cc \
builder.cc build-result.cc build-remote.cc \ builder.cc build-result.cc build-remote.cc \
build-result.hh counter.hh token-server.hh state.hh db.hh build-result.hh counter.hh state.hh db.hh
hydra_queue_runner_LDADD = $(NIX_LIBS) -lpqxx hydra_queue_runner_LDADD = $(NIX_LIBS) -lpqxx
hydra_queue_runner_CXXFLAGS = $(NIX_CFLAGS) -Wall -I ../libhydra -Wno-deprecated-declarations hydra_queue_runner_CXXFLAGS = $(NIX_CFLAGS) -Wall -I ../libhydra -Wno-deprecated-declarations

View file

@ -426,31 +426,11 @@ void State::buildRemote(ref<Store> destStore,
printMsg(lvlDebug, "copying outputs of %s from %s (%d bytes)", printMsg(lvlDebug, "copying outputs of %s from %s (%d bytes)",
localStore->printStorePath(step->drvPath), machine->sshName, totalNarSize); localStore->printStorePath(step->drvPath), machine->sshName, totalNarSize);
/* Block until we have the required amount of memory
available, which is twice the NAR size (namely the
uncompressed and worst-case compressed NAR), plus 150
MB for xz compression overhead. (The xz manpage claims
~94 MiB, but that's not was I'm seeing.) */
auto resStart = std::chrono::steady_clock::now();
size_t compressionCost = totalNarSize + 150 * 1024 * 1024;
result.tokens = std::make_unique<nix::TokenServer::Token>(memoryTokens.get(totalNarSize + compressionCost));
auto resStop = std::chrono::steady_clock::now();
auto resMs = std::chrono::duration_cast<std::chrono::milliseconds>(resStop - resStart).count();
if (resMs >= 1000)
printMsg(lvlError, "warning: had to wait %d ms for %d memory tokens for %s",
resMs, totalNarSize, localStore->printStorePath(step->drvPath));
to << cmdExportPaths << 0; to << cmdExportPaths << 0;
writeStorePaths(*localStore, to, outputs); writeStorePaths(*localStore, to, outputs);
to.flush(); to.flush();
destStore->importPaths(from, /* result.accessor, */ NoCheckSigs); destStore->importPaths(from, /* result.accessor, */ NoCheckSigs);
/* Release the tokens pertaining to NAR
compression. After this we only have the uncompressed
NAR in memory. */
result.tokens->give_back(compressionCost);
auto now2 = std::chrono::steady_clock::now(); auto now2 = std::chrono::steady_clock::now();
result.overhead += std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count(); result.overhead += std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();

View file

@ -204,8 +204,6 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
try { try {
/* FIXME: referring builds may have conflicting timeouts. */ /* FIXME: referring builds may have conflicting timeouts. */
buildRemote(destStore, machine, step, maxSilentTime, buildTimeout, repeats, result, activeStep, updateStep); buildRemote(destStore, machine, step, maxSilentTime, buildTimeout, repeats, result, activeStep, updateStep);
} catch (NoTokens & e) {
result.stepStatus = bsNarSizeLimitExceeded;
} catch (Error & e) { } catch (Error & e) {
if (activeStep->state_.lock()->cancelled) { if (activeStep->state_.lock()->cancelled) {
printInfo("marking step %d of build %d as cancelled", stepNr, buildId); printInfo("marking step %d of build %d as cancelled", stepNr, buildId);
@ -224,7 +222,6 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
} }
result.accessor = 0; result.accessor = 0;
result.tokens = 0;
} }
time_t stepStopTime = time(0); time_t stepStopTime = time(0);

View file

@ -49,14 +49,11 @@ State::State()
: config(std::make_unique<HydraConfig>()) : config(std::make_unique<HydraConfig>())
, maxUnsupportedTime(config->getIntOption("max_unsupported_time", 0)) , maxUnsupportedTime(config->getIntOption("max_unsupported_time", 0))
, dbPool(config->getIntOption("max_db_connections", 128)) , dbPool(config->getIntOption("max_db_connections", 128))
, memoryTokens(config->getIntOption("nar_buffer_size", getMemSize() / 2))
, maxOutputSize(config->getIntOption("max_output_size", 2ULL << 30)) , maxOutputSize(config->getIntOption("max_output_size", 2ULL << 30))
, maxLogSize(config->getIntOption("max_log_size", 64ULL << 20)) , maxLogSize(config->getIntOption("max_log_size", 64ULL << 20))
, uploadLogsToBinaryCache(config->getBoolOption("upload_logs_to_binary_cache", false)) , uploadLogsToBinaryCache(config->getBoolOption("upload_logs_to_binary_cache", false))
, rootsDir(config->getStrOption("gc_roots_dir", fmt("%s/gcroots/per-user/%s/hydra-roots", settings.nixStateDir, getEnvOrDie("LOGNAME")))) , rootsDir(config->getStrOption("gc_roots_dir", fmt("%s/gcroots/per-user/%s/hydra-roots", settings.nixStateDir, getEnvOrDie("LOGNAME"))))
{ {
debug("using %d bytes for the NAR buffer", memoryTokens.capacity());
hydraData = getEnvOrDie("HYDRA_DATA"); hydraData = getEnvOrDie("HYDRA_DATA");
logDir = canonPath(hydraData + "/build-logs"); logDir = canonPath(hydraData + "/build-logs");
@ -544,7 +541,6 @@ void State::dumpStatus(Connection & conn)
root.attr("dispatchTimeAvgMs", nrDispatcherWakeups == 0 ? 0.0 : (float) dispatchTimeMs / nrDispatcherWakeups); root.attr("dispatchTimeAvgMs", nrDispatcherWakeups == 0 ? 0.0 : (float) dispatchTimeMs / nrDispatcherWakeups);
root.attr("nrDbConnections", dbPool.count()); root.attr("nrDbConnections", dbPool.count());
root.attr("nrActiveDbUpdates", nrActiveDbUpdates); root.attr("nrActiveDbUpdates", nrActiveDbUpdates);
root.attr("memoryTokensInUse", memoryTokens.currentUse());
{ {
auto nested = root.object("machines"); auto nested = root.object("machines");

View file

@ -8,7 +8,6 @@
#include <queue> #include <queue>
#include "db.hh" #include "db.hh"
#include "token-server.hh"
#include "parsed-derivations.hh" #include "parsed-derivations.hh"
#include "pathlocks.hh" #include "pathlocks.hh"
@ -65,7 +64,6 @@ struct RemoteResult
time_t startTime = 0, stopTime = 0; time_t startTime = 0, stopTime = 0;
unsigned int overhead = 0; unsigned int overhead = 0;
nix::Path logFile; nix::Path logFile;
std::unique_ptr<nix::TokenServer::Token> tokens;
std::shared_ptr<nix::FSAccessor> accessor; std::shared_ptr<nix::FSAccessor> accessor;
BuildStatus buildStatus() const BuildStatus buildStatus() const
@ -410,13 +408,6 @@ private:
std::shared_ptr<nix::Store> localStore; std::shared_ptr<nix::Store> localStore;
std::shared_ptr<nix::Store> _destStore; std::shared_ptr<nix::Store> _destStore;
/* Token server to prevent threads from allocating too many big
strings concurrently while importing NARs from the build
machines. When a thread imports a NAR of size N, it will first
acquire N memory tokens, causing it to block until that many
tokens are available. */
nix::TokenServer memoryTokens;
size_t maxOutputSize; size_t maxOutputSize;
size_t maxLogSize; size_t maxLogSize;

View file

@ -1,109 +0,0 @@
#pragma once
#include <atomic>
#include "sync.hh"
#include "types.hh"
namespace nix {
MakeError(NoTokens, Error);
/* This class hands out tokens. There are only maxTokens tokens
available. Calling get(N) will return a Token object, representing
ownership of N tokens. If the requested number of tokens is
unavailable, get() will sleep until another thread returns a
token. */
class TokenServer
{
const size_t maxTokens;
Sync<size_t> inUse{0};
std::condition_variable wakeup;
public:
TokenServer(size_t maxTokens) : maxTokens(maxTokens) { }
class Token
{
friend TokenServer;
TokenServer * ts;
size_t tokens;
bool acquired = false;
Token(TokenServer * ts, size_t tokens, unsigned int timeout)
: ts(ts), tokens(tokens)
{
if (tokens >= ts->maxTokens)
throw NoTokens("requesting more tokens (%d) than exist (%d)", tokens, ts->maxTokens);
debug("acquiring %d tokens", tokens);
auto inUse(ts->inUse.lock());
while (*inUse + tokens > ts->maxTokens)
if (timeout) {
if (!inUse.wait_for(ts->wakeup, std::chrono::seconds(timeout),
[&]() { return *inUse + tokens <= ts->maxTokens; }))
return;
} else
inUse.wait(ts->wakeup);
*inUse += tokens;
acquired = true;
}
public:
Token(Token && t) : ts(t.ts), tokens(t.tokens), acquired(t.acquired)
{
t.ts = 0;
t.acquired = false;
}
Token(const Token & l) = delete;
~Token()
{
if (!ts || !acquired) return;
give_back(tokens);
}
bool operator ()() { return acquired; }
void give_back(size_t t)
{
debug("returning %d tokens", t);
if (!t) return;
assert(acquired);
assert(t <= tokens);
{
auto inUse(ts->inUse.lock());
assert(*inUse >= t);
*inUse -= t;
tokens -= t;
}
// FIXME: inefficient. Should wake up waiters that can
// proceed now.
ts->wakeup.notify_all();
}
};
Token get(size_t tokens = 1, unsigned int timeout = 0)
{
return Token(this, tokens, timeout);
}
size_t currentUse()
{
auto inUse_(inUse.lock());
return *inUse_;
}
size_t capacity()
{
return maxTokens;
}
};
}