From a51bd392a22fba5b0a0d90e2204a608b78c37ce1 Mon Sep 17 00:00:00 2001 From: Pierre Bourdon Date: Thu, 11 Apr 2024 15:03:23 +0200 Subject: [PATCH] queue-runner: limit parallelism of CPU intensive operations My current theory is that running more parallel xz than available CPU cores is reducing our overall throughput by requiring more scheduling overhead and more cache thrashing. --- src/hydra-queue-runner/build-remote.cc | 18 ++++++++++++++++++ src/hydra-queue-runner/hydra-queue-runner.cc | 1 + src/hydra-queue-runner/state.hh | 6 ++++++ src/root/build.tt | 2 ++ 4 files changed, 27 insertions(+) diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc index b1629595..b4a08d7d 100644 --- a/src/hydra-queue-runner/build-remote.cc +++ b/src/hydra-queue-runner/build-remote.cc @@ -494,6 +494,16 @@ void RemoteResult::updateWithBuildResult(const nix::BuildResult & buildResult) } +/* Utility guard object to auto-release a semaphore on destruction. */ +template +class SemaphoreReleaser { +public: + SemaphoreReleaser(T* s) : sem(s) {} + ~SemaphoreReleaser() { sem->release(); } + +private: + T* sem; +}; void State::buildRemote(ref destStore, ::Machine::ptr machine, Step::ptr step, @@ -612,6 +622,14 @@ void State::buildRemote(ref destStore, result.logFile = ""; } + /* Throttle CPU-bound work. Opportunistically skip updating the current + * step, since this requires a DB roundtrip. */ + if (!localWorkThrottler.try_acquire()) { + updateStep(ssWaitingForLocalSlot); + localWorkThrottler.acquire(); + } + SemaphoreReleaser releaser(&localWorkThrottler); + StorePathSet outputs; for (auto & [_, realisation] : buildResult.builtOutputs) outputs.insert(realisation.outPath); diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index dffd8ad6..6e7a1816 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -85,6 +85,7 @@ State::State(std::optional metricsAddrOpt) : config(std::make_unique()) , maxUnsupportedTime(config->getIntOption("max_unsupported_time", 0)) , dbPool(config->getIntOption("max_db_connections", 128)) + , localWorkThrottler(config->getIntOption("max_local_worker_threads", std::min(maxSupportedLocalWorkers, std::max(4u, std::thread::hardware_concurrency()) - 2))) , maxOutputSize(config->getIntOption("max_output_size", 2ULL << 30)) , maxLogSize(config->getIntOption("max_log_size", 64ULL << 20)) , uploadLogsToBinaryCache(config->getBoolOption("upload_logs_to_binary_cache", false)) diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index d9e969e4..89a7567d 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -56,6 +57,7 @@ typedef enum { ssConnecting = 10, ssSendingInputs = 20, ssBuilding = 30, + ssWaitingForLocalSlot = 35, ssReceivingOutputs = 40, ssPostProcessing = 50, } StepState; @@ -387,6 +389,10 @@ private: typedef std::map Machines; nix::Sync machines; // FIXME: use atomic_shared_ptr + /* Throttler for CPU-bound local work. */ + static constexpr unsigned int maxSupportedLocalWorkers = 1024; + std::counting_semaphore localWorkThrottler; + /* Various stats. */ time_t startedAt; counter nrBuildsRead{0}; diff --git a/src/root/build.tt b/src/root/build.tt index 93a02e0f..b4176958 100644 --- a/src/root/build.tt +++ b/src/root/build.tt @@ -69,6 +69,8 @@ END; Sending inputs [% ELSIF step.busy == 30 %] Building + [% ELSIF step.busy == 35 %] + Waiting to receive outputs [% ELSIF step.busy == 40 %] Receiving outputs [% ELSIF step.busy == 50 %]