queue-runner: limit parallelism of CPU intensive operations

My current theory is that running more parallel xz than available CPU
cores is reducing our overall throughput by requiring more scheduling
overhead and more cache thrashing.
This commit is contained in:
Pierre Bourdon 2024-04-11 15:03:23 +02:00
parent a596d6c3c1
commit a51bd392a2
Signed by untrusted user: delroth
GPG key ID: 6FB80DCD84DA0F1C
4 changed files with 27 additions and 0 deletions

View file

@ -494,6 +494,16 @@ void RemoteResult::updateWithBuildResult(const nix::BuildResult & buildResult)
} }
/* Utility guard object to auto-release a semaphore on destruction. */
template <typename T>
class SemaphoreReleaser {
public:
SemaphoreReleaser(T* s) : sem(s) {}
~SemaphoreReleaser() { sem->release(); }
private:
T* sem;
};
void State::buildRemote(ref<Store> destStore, void State::buildRemote(ref<Store> destStore,
::Machine::ptr machine, Step::ptr step, ::Machine::ptr machine, Step::ptr step,
@ -612,6 +622,14 @@ void State::buildRemote(ref<Store> destStore,
result.logFile = ""; result.logFile = "";
} }
/* Throttle CPU-bound work. Opportunistically skip updating the current
* step, since this requires a DB roundtrip. */
if (!localWorkThrottler.try_acquire()) {
updateStep(ssWaitingForLocalSlot);
localWorkThrottler.acquire();
}
SemaphoreReleaser releaser(&localWorkThrottler);
StorePathSet outputs; StorePathSet outputs;
for (auto & [_, realisation] : buildResult.builtOutputs) for (auto & [_, realisation] : buildResult.builtOutputs)
outputs.insert(realisation.outPath); outputs.insert(realisation.outPath);

View file

@ -85,6 +85,7 @@ State::State(std::optional<std::string> metricsAddrOpt)
: config(std::make_unique<HydraConfig>()) : config(std::make_unique<HydraConfig>())
, maxUnsupportedTime(config->getIntOption("max_unsupported_time", 0)) , maxUnsupportedTime(config->getIntOption("max_unsupported_time", 0))
, dbPool(config->getIntOption("max_db_connections", 128)) , dbPool(config->getIntOption("max_db_connections", 128))
, localWorkThrottler(config->getIntOption("max_local_worker_threads", std::min(maxSupportedLocalWorkers, std::max(4u, std::thread::hardware_concurrency()) - 2)))
, maxOutputSize(config->getIntOption("max_output_size", 2ULL << 30)) , maxOutputSize(config->getIntOption("max_output_size", 2ULL << 30))
, maxLogSize(config->getIntOption("max_log_size", 64ULL << 20)) , maxLogSize(config->getIntOption("max_log_size", 64ULL << 20))
, uploadLogsToBinaryCache(config->getBoolOption("upload_logs_to_binary_cache", false)) , uploadLogsToBinaryCache(config->getBoolOption("upload_logs_to_binary_cache", false))

View file

@ -7,6 +7,7 @@
#include <memory> #include <memory>
#include <queue> #include <queue>
#include <regex> #include <regex>
#include <semaphore>
#include <prometheus/counter.h> #include <prometheus/counter.h>
#include <prometheus/gauge.h> #include <prometheus/gauge.h>
@ -56,6 +57,7 @@ typedef enum {
ssConnecting = 10, ssConnecting = 10,
ssSendingInputs = 20, ssSendingInputs = 20,
ssBuilding = 30, ssBuilding = 30,
ssWaitingForLocalSlot = 35,
ssReceivingOutputs = 40, ssReceivingOutputs = 40,
ssPostProcessing = 50, ssPostProcessing = 50,
} StepState; } StepState;
@ -387,6 +389,10 @@ private:
typedef std::map<std::string, Machine::ptr> Machines; typedef std::map<std::string, Machine::ptr> Machines;
nix::Sync<Machines> machines; // FIXME: use atomic_shared_ptr nix::Sync<Machines> machines; // FIXME: use atomic_shared_ptr
/* Throttler for CPU-bound local work. */
static constexpr unsigned int maxSupportedLocalWorkers = 1024;
std::counting_semaphore<maxSupportedLocalWorkers> localWorkThrottler;
/* Various stats. */ /* Various stats. */
time_t startedAt; time_t startedAt;
counter nrBuildsRead{0}; counter nrBuildsRead{0};

View file

@ -69,6 +69,8 @@ END;
<strong>Sending inputs</strong> <strong>Sending inputs</strong>
[% ELSIF step.busy == 30 %] [% ELSIF step.busy == 30 %]
<strong>Building</strong> <strong>Building</strong>
[% ELSIF step.busy == 35 %]
<strong>Waiting to receive outputs</strong>
[% ELSIF step.busy == 40 %] [% ELSIF step.busy == 40 %]
<strong>Receiving outputs</strong> <strong>Receiving outputs</strong>
[% ELSIF step.busy == 50 %] [% ELSIF step.busy == 50 %]