diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc index b1629595..b4a08d7d 100644 --- a/src/hydra-queue-runner/build-remote.cc +++ b/src/hydra-queue-runner/build-remote.cc @@ -494,6 +494,16 @@ void RemoteResult::updateWithBuildResult(const nix::BuildResult & buildResult) } +/* Utility guard object to auto-release a semaphore on destruction. */ +template +class SemaphoreReleaser { +public: + SemaphoreReleaser(T* s) : sem(s) {} + ~SemaphoreReleaser() { sem->release(); } + +private: + T* sem; +}; void State::buildRemote(ref destStore, ::Machine::ptr machine, Step::ptr step, @@ -612,6 +622,14 @@ void State::buildRemote(ref destStore, result.logFile = ""; } + /* Throttle CPU-bound work. Opportunistically skip updating the current + * step, since this requires a DB roundtrip. */ + if (!localWorkThrottler.try_acquire()) { + updateStep(ssWaitingForLocalSlot); + localWorkThrottler.acquire(); + } + SemaphoreReleaser releaser(&localWorkThrottler); + StorePathSet outputs; for (auto & [_, realisation] : buildResult.builtOutputs) outputs.insert(realisation.outPath); diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index dffd8ad6..6e7a1816 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -85,6 +85,7 @@ State::State(std::optional metricsAddrOpt) : config(std::make_unique()) , maxUnsupportedTime(config->getIntOption("max_unsupported_time", 0)) , dbPool(config->getIntOption("max_db_connections", 128)) + , localWorkThrottler(config->getIntOption("max_local_worker_threads", std::min(maxSupportedLocalWorkers, std::max(4u, std::thread::hardware_concurrency()) - 2))) , maxOutputSize(config->getIntOption("max_output_size", 2ULL << 30)) , maxLogSize(config->getIntOption("max_log_size", 64ULL << 20)) , uploadLogsToBinaryCache(config->getBoolOption("upload_logs_to_binary_cache", false)) diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index d9e969e4..89a7567d 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -56,6 +57,7 @@ typedef enum { ssConnecting = 10, ssSendingInputs = 20, ssBuilding = 30, + ssWaitingForLocalSlot = 35, ssReceivingOutputs = 40, ssPostProcessing = 50, } StepState; @@ -387,6 +389,10 @@ private: typedef std::map Machines; nix::Sync machines; // FIXME: use atomic_shared_ptr + /* Throttler for CPU-bound local work. */ + static constexpr unsigned int maxSupportedLocalWorkers = 1024; + std::counting_semaphore localWorkThrottler; + /* Various stats. */ time_t startedAt; counter nrBuildsRead{0}; diff --git a/src/root/build.tt b/src/root/build.tt index 93a02e0f..b4176958 100644 --- a/src/root/build.tt +++ b/src/root/build.tt @@ -69,6 +69,8 @@ END; Sending inputs [% ELSIF step.busy == 30 %] Building + [% ELSIF step.busy == 35 %] + Waiting to receive outputs [% ELSIF step.busy == 40 %] Receiving outputs [% ELSIF step.busy == 50 %]