Compare commits

...

10 commits

Author SHA1 Message Date
leo60228 a053ef8fdf
lix api changes
Some checks are pending
Test / tests (push) Waiting to run
2024-05-10 15:00:54 -04:00
leo60228 803b8ee731
Revert "Update to Nix 2.19"
This reverts commit c922e73c11.
2024-05-10 14:47:11 -04:00
leo60228 249620b49e
use lix 2024-05-10 12:49:27 -04:00
Pierre Bourdon b8d03adaf4
queue runner: attempt at slightly smarter scheduling criteria
Instead of just going for "whatever is the oldest build we know of",
use the following first:

- Is the step more constrained? If so, schedule it first to avoid
  filling up "more desirable" build slots with less constrained builds.

- Does the step have more dependents? If so, schedule it first to try
  and maximize open parallelism and breadth of scheduling options.
2024-04-21 17:36:16 +02:00
Pierre Bourdon ee1a7a7813
web: serveFile: also serve a CSP putting served HTML in its own origin 2024-04-21 16:14:24 +02:00
Pierre Bourdon 5c3e508e55
queue-runner: release machine reservation while copying outputs
This allows for better builder usage when the queue runner is busy. To
avoid running into uncontrollable imbalances between builder/queue
runner, we only release the machine reservation after the local
throttler has found a slot to start copying the outputs for that build.
2024-04-21 01:55:19 +02:00
Pierre Bourdon 026e3a3103
queue-runner: switch to pseudorandom ordering of builds processing
We don't rely on sequential / monotonic build IDs processing anymore, so
randomizing actually has the advantage of mixing builds for different
systems together, to avoid only one chunk of builds for a single system
getting processed while builders for other systems are starved.
2024-04-20 23:05:26 +02:00
Pierre Bourdon 6606a7f86e
queue runner: introduce some parallelism for remote paths lookup
Each output for a given step being ingested is looked up in parallel,
which should basically multiply the speed of builds ingestion by the
average number of outputs per derivation.
2024-04-20 22:28:18 +02:00
Pierre Bourdon f31b95d371
queue-runner: reduce the time between queue monitor restarts
This will induce more DB queries (though these are fairly cheap), but at
the benefit of processing bumps within 1m instead of within 10m.
2024-04-20 16:58:10 +02:00
Pierre Bourdon 54f8daf6b1
queue-runner: remove id > X from new builds query
Running the query with/without it shows that it makes no difference to
postgres, since there's an index on finished=0 already. This allows a
few simplifications, but also paves the way towards running multiple
parallel monitor threads in the future.
2024-04-20 16:53:52 +02:00
16 changed files with 147 additions and 159 deletions

View file

@ -16,69 +16,37 @@
"type": "github" "type": "github"
} }
}, },
"lowdown-src": {
"flake": false,
"locked": {
"lastModified": 1633514407,
"narHash": "sha256-Dw32tiMjdK9t3ETl5fzGrutQTzh2rufgZV4A/BbxuD4=",
"owner": "kristapsdz",
"repo": "lowdown",
"rev": "d2c2b44ff6c27b936ec27358a2653caaef8f73b8",
"type": "github"
},
"original": {
"owner": "kristapsdz",
"repo": "lowdown",
"type": "github"
}
},
"nix": { "nix": {
"inputs": { "inputs": {
"flake-compat": "flake-compat", "flake-compat": "flake-compat",
"lowdown-src": "lowdown-src",
"nixpkgs": [ "nixpkgs": [
"nixpkgs" "nixpkgs"
], ],
"nixpkgs-regression": "nixpkgs-regression" "nixpkgs-regression": "nixpkgs-regression",
"pre-commit-hooks": "pre-commit-hooks"
}, },
"locked": { "locked": {
"lastModified": 1706208340, "lastModified": 1714955862,
"narHash": "sha256-wNyHUEIiKKVs6UXrUzhP7RSJQv0A8jckgcuylzftl8k=", "narHash": "sha256-REWlo2RYHfJkxnmZTEJu3Cd/2VM+wjjpPy7Xi4BdDTQ=",
"owner": "NixOS", "ref": "refs/tags/2.90-beta.1",
"repo": "nix", "rev": "b6799ab0374a8e1907a48915d3187e07da41d88c",
"rev": "2c4bb93ba5a97e7078896ebc36385ce172960e4e", "revCount": 15501,
"type": "github" "type": "git",
"url": "https://git@git.lix.systems/lix-project/lix"
}, },
"original": { "original": {
"owner": "NixOS", "ref": "refs/tags/2.90-beta.1",
"ref": "2.19-maintenance", "type": "git",
"repo": "nix", "url": "https://git@git.lix.systems/lix-project/lix"
"type": "github"
} }
}, },
"nixpkgs": { "nixpkgs": {
"locked": { "locked": {
"lastModified": 1701615100, "lastModified": 1715218190,
"narHash": "sha256-7VI84NGBvlCTduw2aHLVB62NvCiZUlALLqBe5v684Aw=", "narHash": "sha256-R98WOBHkk8wIi103JUVQF3ei3oui4HvoZcz9tYOAwlk=",
"owner": "NixOS", "owner": "NixOS",
"repo": "nixpkgs", "repo": "nixpkgs",
"rev": "e9f06adb793d1cca5384907b3b8a4071d5d7cb19", "rev": "9a9960b98418f8c385f52de3b09a63f9c561427a",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-23.05",
"repo": "nixpkgs",
"type": "github"
}
},
"nixpkgs-for-fileset": {
"locked": {
"lastModified": 1706098335,
"narHash": "sha256-r3dWjT8P9/Ah5m5ul4WqIWD8muj5F+/gbCdjiNVBKmU=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "a77ab169a83a4175169d78684ddd2e54486ac651",
"type": "github" "type": "github"
}, },
"original": { "original": {
@ -104,11 +72,26 @@
"type": "github" "type": "github"
} }
}, },
"pre-commit-hooks": {
"flake": false,
"locked": {
"lastModified": 1714478972,
"narHash": "sha256-q//cgb52vv81uOuwz1LaXElp3XAe1TqrABXODAEF6Sk=",
"owner": "cachix",
"repo": "git-hooks.nix",
"rev": "2849da033884f54822af194400f8dff435ada242",
"type": "github"
},
"original": {
"owner": "cachix",
"repo": "git-hooks.nix",
"type": "github"
}
},
"root": { "root": {
"inputs": { "inputs": {
"nix": "nix", "nix": "nix",
"nixpkgs": "nixpkgs", "nixpkgs": "nixpkgs"
"nixpkgs-for-fileset": "nixpkgs-for-fileset"
} }
} }
}, },

View file

@ -1,16 +1,11 @@
{ {
description = "A Nix-based continuous build system"; description = "A Nix-based continuous build system";
inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixos-23.05"; inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixos-23.11";
inputs.nix.url = "github:NixOS/nix/2.19-maintenance"; inputs.nix.url = "git+https://git@git.lix.systems/lix-project/lix?ref=refs/tags/2.90-beta.1";
inputs.nix.inputs.nixpkgs.follows = "nixpkgs"; inputs.nix.inputs.nixpkgs.follows = "nixpkgs";
# TODO get rid of this once https://github.com/NixOS/nix/pull/9546 is outputs = { self, nixpkgs, nix }:
# mered and we upgrade or Nix, so the main `nixpkgs` input is at least
# 23.11 and has `lib.fileset`.
inputs.nixpkgs-for-fileset.url = "github:NixOS/nixpkgs/nixos-23.11";
outputs = { self, nixpkgs, nix, nixpkgs-for-fileset }:
let let
systems = [ "x86_64-linux" "aarch64-linux" ]; systems = [ "x86_64-linux" "aarch64-linux" ];
forEachSystem = nixpkgs.lib.genAttrs systems; forEachSystem = nixpkgs.lib.genAttrs systems;
@ -67,7 +62,7 @@
}; };
hydra = final.callPackage ./package.nix { hydra = final.callPackage ./package.nix {
inherit (nixpkgs-for-fileset.lib) fileset; inherit (final.lib) fileset;
rawSrc = self; rawSrc = self;
}; };
}; };

View file

@ -9,7 +9,6 @@
#include "eval-inline.hh" #include "eval-inline.hh"
#include "eval-settings.hh" #include "eval-settings.hh"
#include "signals.hh" #include "signals.hh"
#include "terminal.hh"
#include "util.hh" #include "util.hh"
#include "get-drvs.hh" #include "get-drvs.hh"
#include "globals.hh" #include "globals.hh"
@ -97,7 +96,7 @@ static std::string queryMetaStrings(EvalState & state, DrvInfo & drv, const std:
rec = [&](Value & v) { rec = [&](Value & v) {
state.forceValue(v, noPos); state.forceValue(v, noPos);
if (v.type() == nString) if (v.type() == nString)
res.emplace_back(v.string_view()); res.push_back(v.string.s);
else if (v.isList()) else if (v.isList())
for (unsigned int n = 0; n < v.listSize(); ++n) for (unsigned int n = 0; n < v.listSize(); ++n)
rec(*v.listElems()[n]); rec(*v.listElems()[n]);
@ -162,7 +161,7 @@ static void worker(
auto s = readLine(from.get()); auto s = readLine(from.get());
if (s == "exit") break; if (s == "exit") break;
if (!hasPrefix(s, "do ")) abort(); if (!s.starts_with("do ")) abort();
std::string attrPath(s, 3); std::string attrPath(s, 3);
debug("worker process %d at '%s'", getpid(), attrPath); debug("worker process %d at '%s'", getpid(), attrPath);
@ -185,7 +184,7 @@ static void worker(
!experimentalFeatureSettings.isEnabled(Xp::CaDerivations)); !experimentalFeatureSettings.isEnabled(Xp::CaDerivations));
if (drv->querySystem() == "unknown") if (drv->querySystem() == "unknown")
throw EvalError("derivation must have a 'system' attribute"); state.error<EvalError>("derivation must have a 'system' attribute").debugThrow();
auto drvPath = state.store->printStorePath(drv->requireDrvPath()); auto drvPath = state.store->printStorePath(drv->requireDrvPath());
@ -208,7 +207,7 @@ static void worker(
if (a && state.forceBool(*a->value, a->pos, "while evaluating the `_hydraAggregate` attribute")) { if (a && state.forceBool(*a->value, a->pos, "while evaluating the `_hydraAggregate` attribute")) {
auto a = v->attrs->get(state.symbols.create("constituents")); auto a = v->attrs->get(state.symbols.create("constituents"));
if (!a) if (!a)
throw EvalError("derivation must have a constituents attribute"); state.error<EvalError>("derivation must have a constituents attribute").debugThrow();
NixStringContext context; NixStringContext context;
state.coerceToString(a->pos, *a->value, context, "while evaluating the `constituents` attribute", true, false); state.coerceToString(a->pos, *a->value, context, "while evaluating the `constituents` attribute", true, false);
@ -228,7 +227,7 @@ static void worker(
auto v = a->value->listElems()[n]; auto v = a->value->listElems()[n];
state.forceValue(*v, noPos); state.forceValue(*v, noPos);
if (v->type() == nString) if (v->type() == nString)
job["namedConstituents"].push_back(v->string_view()); job["namedConstituents"].push_back(v->str());
} }
} }
@ -274,7 +273,7 @@ static void worker(
else if (v->type() == nNull) else if (v->type() == nNull)
; ;
else throw TypeError("attribute '%s' is %s, which is not supported", attrPath, showType(*v)); else state.error<TypeError>("attribute '%s' is %s, which is not supported", attrPath, showType(*v)).debugThrow();
} catch (EvalError & e) { } catch (EvalError & e) {
auto msg = e.msg(); auto msg = e.msg();
@ -381,8 +380,7 @@ int main(int argc, char * * argv)
// what's shown in the Hydra UI. // what's shown in the Hydra UI.
writeLine(to->get(), "restart"); writeLine(to->get(), "restart");
} }
}, });
ProcessOptions { .allowVfork = false });
from = std::move(fromPipe.readSide); from = std::move(fromPipe.readSide);
to = std::move(toPipe.writeSide); to = std::move(toPipe.writeSide);
debug("created worker process %d", pid); debug("created worker process %d", pid);
@ -533,7 +531,7 @@ int main(int argc, char * * argv)
if (brokenJobs.empty()) { if (brokenJobs.empty()) {
std::string drvName(drvPath.name()); std::string drvName(drvPath.name());
assert(hasSuffix(drvName, drvExtension)); assert(drvName.ends_with(drvExtension));
drvName.resize(drvName.size() - drvExtension.size()); drvName.resize(drvName.size() - drvExtension.size());
auto hashModulo = hashDerivationModulo(*store, drv, true); auto hashModulo = hashDerivationModulo(*store, drv, true);

View file

@ -12,7 +12,10 @@
#include <sys/types.h> #include <sys/types.h>
#include <sys/wait.h> #include <sys/wait.h>
#include <boost/format.hpp>
using namespace nix; using namespace nix;
using boost::format;
typedef std::pair<std::string, std::string> JobsetName; typedef std::pair<std::string, std::string> JobsetName;
@ -506,7 +509,7 @@ int main(int argc, char * * argv)
parseCmdLine(argc, argv, [&](Strings::iterator & arg, const Strings::iterator & end) { parseCmdLine(argc, argv, [&](Strings::iterator & arg, const Strings::iterator & end) {
if (*arg == "--unlock") if (*arg == "--unlock")
unlock = true; unlock = true;
else if (hasPrefix(*arg, "-")) else if (arg->starts_with("-"))
return false; return false;
args.push_back(*arg); args.push_back(*arg);
return true; return true;

View file

@ -9,8 +9,6 @@
#include "path.hh" #include "path.hh"
#include "serve-protocol.hh" #include "serve-protocol.hh"
#include "state.hh" #include "state.hh"
#include "current-process.hh"
#include "processes.hh"
#include "util.hh" #include "util.hh"
#include "serve-protocol.hh" #include "serve-protocol.hh"
#include "serve-protocol-impl.hh" #include "serve-protocol-impl.hh"
@ -95,11 +93,11 @@ static void openConnection(::Machine::ptr machine, Path tmpDir, int stderrFD, SS
throw SysError("cannot start %s", pgmName); throw SysError("cannot start %s", pgmName);
}); });
to.readSide = -1; to.readSide.reset();
from.writeSide = -1; from.writeSide.reset();
child.in = to.writeSide.release(); child.in = AutoCloseFD{to.writeSide.release()};
child.out = from.readSide.release(); child.out = AutoCloseFD{from.readSide.release()};
// XXX: determine the actual max value we can use from /proc. // XXX: determine the actual max value we can use from /proc.
int pipesize = 1024 * 1024; int pipesize = 1024 * 1024;
@ -188,7 +186,7 @@ static std::pair<Path, AutoCloseFD> openLogFile(const std::string & logDir, cons
createDirs(dirOf(logFile)); createDirs(dirOf(logFile));
AutoCloseFD logFD = open(logFile.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0666); AutoCloseFD logFD{open(logFile.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0666)};
if (!logFD) throw SysError("creating log file %s", logFile); if (!logFD) throw SysError("creating log file %s", logFile);
return {std::move(logFile), std::move(logFD)}; return {std::move(logFile), std::move(logFD)};
@ -506,6 +504,7 @@ private:
}; };
void State::buildRemote(ref<Store> destStore, void State::buildRemote(ref<Store> destStore,
MachineReservation::ptr & reservation,
::Machine::ptr machine, Step::ptr step, ::Machine::ptr machine, Step::ptr step,
const BuildOptions & buildOptions, const BuildOptions & buildOptions,
RemoteResult & result, std::shared_ptr<ActiveStep> activeStep, RemoteResult & result, std::shared_ptr<ActiveStep> activeStep,
@ -589,7 +588,7 @@ void State::buildRemote(ref<Store> destStore,
if (ftruncate(logFD.get(), 0) == -1) if (ftruncate(logFD.get(), 0) == -1)
throw SysError("truncating log file %s", result.logFile); throw SysError("truncating log file %s", result.logFile);
logFD = -1; logFD.reset();
/* Do the build. */ /* Do the build. */
printMsg(lvlDebug, "building %s on %s", printMsg(lvlDebug, "building %s on %s",
@ -630,6 +629,15 @@ void State::buildRemote(ref<Store> destStore,
} }
SemaphoreReleaser releaser(&localWorkThrottler); SemaphoreReleaser releaser(&localWorkThrottler);
/* Once we've started copying outputs, release the machine reservation
* so further builds can happen. We do not release the machine earlier
* to avoid situations where the queue runner is bottlenecked on
* copying outputs and we end up building too many things that we
* haven't been able to allow copy slots for. */
assert(reservation.unique());
reservation = 0;
wakeDispatcher();
StorePathSet outputs; StorePathSet outputs;
for (auto & [_, realisation] : buildResult.builtOutputs) for (auto & [_, realisation] : buildResult.builtOutputs)
outputs.insert(realisation.outPath); outputs.insert(realisation.outPath);
@ -676,7 +684,7 @@ void State::buildRemote(ref<Store> destStore,
} }
/* Shut down the connection. */ /* Shut down the connection. */
child.in = -1; child.in.reset();
child.sshPid.wait(); child.sshPid.wait();
} catch (Error & e) { } catch (Error & e) {

View file

@ -1,7 +1,7 @@
#include "hydra-build-result.hh" #include "hydra-build-result.hh"
#include "store-api.hh" #include "store-api.hh"
#include "util.hh" #include "util.hh"
#include "source-accessor.hh" #include "fs-accessor.hh"
#include <regex> #include <regex>
@ -63,7 +63,7 @@ BuildOutput getBuildOutput(
auto productsFile = narMembers.find(outputS + "/nix-support/hydra-build-products"); auto productsFile = narMembers.find(outputS + "/nix-support/hydra-build-products");
if (productsFile == narMembers.end() || if (productsFile == narMembers.end() ||
productsFile->second.type != SourceAccessor::Type::tRegular) productsFile->second.type != FSAccessor::Type::tRegular)
continue; continue;
assert(productsFile->second.contents); assert(productsFile->second.contents);
@ -94,7 +94,7 @@ BuildOutput getBuildOutput(
product.name = product.path == store->printStorePath(output) ? "" : baseNameOf(product.path); product.name = product.path == store->printStorePath(output) ? "" : baseNameOf(product.path);
if (file->second.type == SourceAccessor::Type::tRegular) { if (file->second.type == FSAccessor::Type::tRegular) {
product.isRegular = true; product.isRegular = true;
product.fileSize = file->second.fileSize.value(); product.fileSize = file->second.fileSize.value();
product.sha256hash = file->second.sha256.value(); product.sha256hash = file->second.sha256.value();
@ -116,7 +116,7 @@ BuildOutput getBuildOutput(
auto file = narMembers.find(product.path); auto file = narMembers.find(product.path);
assert(file != narMembers.end()); assert(file != narMembers.end());
if (file->second.type == SourceAccessor::Type::tDirectory) if (file->second.type == FSAccessor::Type::tDirectory)
res.products.push_back(product); res.products.push_back(product);
} }
} }
@ -125,7 +125,7 @@ BuildOutput getBuildOutput(
for (auto & output : outputs) { for (auto & output : outputs) {
auto file = narMembers.find(store->printStorePath(output) + "/nix-support/hydra-release-name"); auto file = narMembers.find(store->printStorePath(output) + "/nix-support/hydra-release-name");
if (file == narMembers.end() || if (file == narMembers.end() ||
file->second.type != SourceAccessor::Type::tRegular) file->second.type != FSAccessor::Type::tRegular)
continue; continue;
res.releaseName = trim(file->second.contents.value()); res.releaseName = trim(file->second.contents.value());
// FIXME: validate release name // FIXME: validate release name
@ -135,7 +135,7 @@ BuildOutput getBuildOutput(
for (auto & output : outputs) { for (auto & output : outputs) {
auto file = narMembers.find(store->printStorePath(output) + "/nix-support/hydra-metrics"); auto file = narMembers.find(store->printStorePath(output) + "/nix-support/hydra-metrics");
if (file == narMembers.end() || if (file == narMembers.end() ||
file->second.type != SourceAccessor::Type::tRegular) file->second.type != FSAccessor::Type::tRegular)
continue; continue;
for (auto & line : tokenizeString<Strings>(file->second.contents.value(), "\n")) { for (auto & line : tokenizeString<Strings>(file->second.contents.value(), "\n")) {
auto fields = tokenizeString<std::vector<std::string>>(line); auto fields = tokenizeString<std::vector<std::string>>(line);

View file

@ -46,10 +46,12 @@ void State::builder(MachineReservation::ptr reservation)
} }
} }
/* Release the machine and wake up the dispatcher. */ /* If the machine hasn't been released yet, release and wake up the dispatcher. */
if (reservation) {
assert(reservation.unique()); assert(reservation.unique());
reservation = 0; reservation = 0;
wakeDispatcher(); wakeDispatcher();
}
/* If there was a temporary failure, retry the step after an /* If there was a temporary failure, retry the step after an
exponentially increasing interval. */ exponentially increasing interval. */
@ -72,11 +74,11 @@ void State::builder(MachineReservation::ptr reservation)
State::StepResult State::doBuildStep(nix::ref<Store> destStore, State::StepResult State::doBuildStep(nix::ref<Store> destStore,
MachineReservation::ptr reservation, MachineReservation::ptr & reservation,
std::shared_ptr<ActiveStep> activeStep) std::shared_ptr<ActiveStep> activeStep)
{ {
auto & step(reservation->step); auto step(reservation->step);
auto & machine(reservation->machine); auto machine(reservation->machine);
{ {
auto step_(step->state.lock()); auto step_(step->state.lock());
@ -208,7 +210,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
try { try {
/* FIXME: referring builds may have conflicting timeouts. */ /* FIXME: referring builds may have conflicting timeouts. */
buildRemote(destStore, machine, step, buildOptions, result, activeStep, updateStep, narMembers); buildRemote(destStore, reservation, machine, step, buildOptions, result, activeStep, updateStep, narMembers);
} catch (Error & e) { } catch (Error & e) {
if (activeStep->state_.lock()->cancelled) { if (activeStep->state_.lock()->cancelled) {
printInfo("marking step %d of build %d as cancelled", stepNr, buildId); printInfo("marking step %d of build %d as cancelled", stepNr, buildId);

View file

@ -133,6 +133,8 @@ system_time State::doDispatch()
comparator is a partial ordering (see MachineInfo). */ comparator is a partial ordering (see MachineInfo). */
int highestGlobalPriority; int highestGlobalPriority;
int highestLocalPriority; int highestLocalPriority;
size_t numRequiredSystemFeatures;
size_t numRevDeps;
BuildID lowestBuildID; BuildID lowestBuildID;
StepInfo(Step::ptr step, Step::State & step_) : step(step) StepInfo(Step::ptr step, Step::State & step_) : step(step)
@ -141,6 +143,8 @@ system_time State::doDispatch()
lowestShareUsed = std::min(lowestShareUsed, jobset->shareUsed()); lowestShareUsed = std::min(lowestShareUsed, jobset->shareUsed());
highestGlobalPriority = step_.highestGlobalPriority; highestGlobalPriority = step_.highestGlobalPriority;
highestLocalPriority = step_.highestLocalPriority; highestLocalPriority = step_.highestLocalPriority;
numRequiredSystemFeatures = step->requiredSystemFeatures.size();
numRevDeps = step_.rdeps.size();
lowestBuildID = step_.lowestBuildID; lowestBuildID = step_.lowestBuildID;
} }
}; };
@ -193,6 +197,8 @@ system_time State::doDispatch()
a.highestGlobalPriority != b.highestGlobalPriority ? a.highestGlobalPriority > b.highestGlobalPriority : a.highestGlobalPriority != b.highestGlobalPriority ? a.highestGlobalPriority > b.highestGlobalPriority :
a.lowestShareUsed != b.lowestShareUsed ? a.lowestShareUsed < b.lowestShareUsed : a.lowestShareUsed != b.lowestShareUsed ? a.lowestShareUsed < b.lowestShareUsed :
a.highestLocalPriority != b.highestLocalPriority ? a.highestLocalPriority > b.highestLocalPriority : a.highestLocalPriority != b.highestLocalPriority ? a.highestLocalPriority > b.highestLocalPriority :
a.numRequiredSystemFeatures != b.numRequiredSystemFeatures ? a.numRequiredSystemFeatures > b.numRequiredSystemFeatures :
a.numRevDeps != b.numRevDeps ? a.numRevDeps > b.numRevDeps :
a.lowestBuildID < b.lowestBuildID; a.lowestBuildID < b.lowestBuildID;
}); });

View file

@ -15,7 +15,6 @@
#include "state.hh" #include "state.hh"
#include "hydra-build-result.hh" #include "hydra-build-result.hh"
#include "store-api.hh" #include "store-api.hh"
#include "remote-store.hh"
#include "globals.hh" #include "globals.hh"
#include "hydra-config.hh" #include "hydra-config.hh"
@ -70,13 +69,6 @@ State::PromMetrics::PromMetrics()
.Register(*registry) .Register(*registry)
.Add({}) .Add({})
) )
, queue_max_id(
prometheus::BuildGauge()
.Name("hydraqueuerunner_queue_max_build_id_info")
.Help("Maximum build record ID in the queue")
.Register(*registry)
.Add({})
)
, dispatcher_time_spent_running( , dispatcher_time_spent_running(
prometheus::BuildCounter() prometheus::BuildCounter()
.Name("hydraqueuerunner_dispatcher_time_spent_running") .Name("hydraqueuerunner_dispatcher_time_spent_running")
@ -542,7 +534,7 @@ void State::markSucceededBuild(pqxx::work & txn, Build::ptr build,
product.type, product.type,
product.subtype, product.subtype,
product.fileSize ? std::make_optional(*product.fileSize) : std::nullopt, product.fileSize ? std::make_optional(*product.fileSize) : std::nullopt,
product.sha256hash ? std::make_optional(product.sha256hash->to_string(HashFormat::Base16, false)) : std::nullopt, product.sha256hash ? std::make_optional(product.sha256hash->to_string(Base16, false)) : std::nullopt,
product.path, product.path,
product.name, product.name,
product.defaultPath); product.defaultPath);
@ -938,20 +930,6 @@ void State::run(BuildID buildOne)
} }
}).detach(); }).detach();
/* Make sure that old daemon connections are closed even when
we're not doing much. */
std::thread([&]() {
while (true) {
sleep(10);
try {
if (auto remoteStore = getDestStore().dynamic_pointer_cast<RemoteStore>())
remoteStore->flushBadConnections();
} catch (std::exception & e) {
printMsg(lvlError, "connection flush thread: %s", e.what());
}
}
}).detach();
/* Monitor the database for status dump requests (e.g. from /* Monitor the database for status dump requests (e.g. from
hydra-queue-runner --status). */ hydra-queue-runner --status). */
while (true) { while (true) {

View file

@ -24,13 +24,13 @@ struct Extractor : ParseSink
void createDirectory(const Path & path) override void createDirectory(const Path & path) override
{ {
members.insert_or_assign(prefix + path, NarMemberData { .type = SourceAccessor::Type::tDirectory }); members.insert_or_assign(prefix + path, NarMemberData { .type = FSAccessor::Type::tDirectory });
} }
void createRegularFile(const Path & path) override void createRegularFile(const Path & path) override
{ {
curMember = &members.insert_or_assign(prefix + path, NarMemberData { curMember = &members.insert_or_assign(prefix + path, NarMemberData {
.type = SourceAccessor::Type::tRegular, .type = FSAccessor::Type::tRegular,
.fileSize = 0, .fileSize = 0,
.contents = filesToKeep.count(path) ? std::optional("") : std::nullopt, .contents = filesToKeep.count(path) ? std::optional("") : std::nullopt,
}).first->second; }).first->second;
@ -66,14 +66,8 @@ struct Extractor : ParseSink
void createSymlink(const Path & path, const std::string & target) override void createSymlink(const Path & path, const std::string & target) override
{ {
members.insert_or_assign(prefix + path, NarMemberData { .type = SourceAccessor::Type::tSymlink }); members.insert_or_assign(prefix + path, NarMemberData { .type = FSAccessor::Type::tSymlink });
} }
void isExecutable() override
{ }
void closeRegularFile() override
{ }
}; };

View file

@ -1,13 +1,13 @@
#pragma once #pragma once
#include "source-accessor.hh" #include "fs-accessor.hh"
#include "types.hh" #include "types.hh"
#include "serialise.hh" #include "serialise.hh"
#include "hash.hh" #include "hash.hh"
struct NarMemberData struct NarMemberData
{ {
nix::SourceAccessor::Type type; nix::FSAccessor::Type type;
std::optional<uint64_t> fileSize; std::optional<uint64_t> fileSize;
std::optional<std::string> contents; std::optional<std::string> contents;
std::optional<nix::Hash> sha256; std::optional<nix::Hash> sha256;

View file

@ -1,6 +1,7 @@
#include "state.hh" #include "state.hh"
#include "hydra-build-result.hh" #include "hydra-build-result.hh"
#include "globals.hh" #include "globals.hh"
#include "thread-pool.hh"
#include <cstring> #include <cstring>
@ -37,15 +38,13 @@ void State::queueMonitorLoop(Connection & conn)
auto destStore = getDestStore(); auto destStore = getDestStore();
unsigned int lastBuildId = 0;
bool quit = false; bool quit = false;
while (!quit) { while (!quit) {
auto t_before_work = std::chrono::steady_clock::now(); auto t_before_work = std::chrono::steady_clock::now();
localStore->clearPathInfoCache(); localStore->clearPathInfoCache();
bool done = getQueuedBuilds(conn, destStore, lastBuildId); bool done = getQueuedBuilds(conn, destStore);
if (buildOne && buildOneDone) quit = true; if (buildOne && buildOneDone) quit = true;
@ -63,12 +62,10 @@ void State::queueMonitorLoop(Connection & conn)
conn.get_notifs(); conn.get_notifs();
if (auto lowestId = buildsAdded.get()) { if (auto lowestId = buildsAdded.get()) {
lastBuildId = std::min(lastBuildId, static_cast<unsigned>(std::stoul(*lowestId) - 1));
printMsg(lvlTalkative, "got notification: new builds added to the queue"); printMsg(lvlTalkative, "got notification: new builds added to the queue");
} }
if (buildsRestarted.get()) { if (buildsRestarted.get()) {
printMsg(lvlTalkative, "got notification: builds restarted"); printMsg(lvlTalkative, "got notification: builds restarted");
lastBuildId = 0; // check all builds
} }
if (buildsCancelled.get() || buildsDeleted.get() || buildsBumped.get()) { if (buildsCancelled.get() || buildsDeleted.get() || buildsBumped.get()) {
printMsg(lvlTalkative, "got notification: builds cancelled or bumped"); printMsg(lvlTalkative, "got notification: builds cancelled or bumped");
@ -95,20 +92,18 @@ struct PreviousFailure : public std::exception {
bool State::getQueuedBuilds(Connection & conn, bool State::getQueuedBuilds(Connection & conn,
ref<Store> destStore, unsigned int & lastBuildId) ref<Store> destStore)
{ {
prom.queue_checks_started.Increment(); prom.queue_checks_started.Increment();
printInfo("checking the queue for builds > %d...", lastBuildId); printInfo("checking the queue for builds...");
/* Grab the queued builds from the database, but don't process /* Grab the queued builds from the database, but don't process
them yet (since we don't want a long-running transaction). */ them yet (since we don't want a long-running transaction). */
std::vector<BuildID> newIDs; std::vector<BuildID> newIDs;
std::map<BuildID, Build::ptr> newBuildsByID; std::unordered_map<BuildID, Build::ptr> newBuildsByID;
std::multimap<StorePath, BuildID> newBuildsByPath; std::multimap<StorePath, BuildID> newBuildsByPath;
unsigned int newLastBuildId = lastBuildId;
{ {
pqxx::work txn(conn); pqxx::work txn(conn);
@ -117,17 +112,12 @@ bool State::getQueuedBuilds(Connection & conn,
"jobsets.name as jobset, job, drvPath, maxsilent, timeout, timestamp, " "jobsets.name as jobset, job, drvPath, maxsilent, timeout, timestamp, "
"globalPriority, priority from Builds " "globalPriority, priority from Builds "
"inner join jobsets on builds.jobset_id = jobsets.id " "inner join jobsets on builds.jobset_id = jobsets.id "
"where builds.id > $1 and finished = 0 order by globalPriority desc, builds.id", "where finished = 0 order by globalPriority desc, random()");
lastBuildId);
for (auto const & row : res) { for (auto const & row : res) {
auto builds_(builds.lock()); auto builds_(builds.lock());
BuildID id = row["id"].as<BuildID>(); BuildID id = row["id"].as<BuildID>();
if (buildOne && id != buildOne) continue; if (buildOne && id != buildOne) continue;
if (id > newLastBuildId) {
newLastBuildId = id;
prom.queue_max_id.Set(id);
}
if (builds_->count(id)) continue; if (builds_->count(id)) continue;
auto build = std::make_shared<Build>( auto build = std::make_shared<Build>(
@ -309,7 +299,7 @@ bool State::getQueuedBuilds(Connection & conn,
try { try {
createBuild(build); createBuild(build);
} catch (Error & e) { } catch (Error & e) {
e.addTrace({}, hintfmt("while loading build %d: ", build->id)); e.addTrace({}, HintFmt("while loading build %d: ", build->id));
throw; throw;
} }
@ -329,15 +319,13 @@ bool State::getQueuedBuilds(Connection & conn,
/* Stop after a certain time to allow priority bumps to be /* Stop after a certain time to allow priority bumps to be
processed. */ processed. */
if (std::chrono::system_clock::now() > start + std::chrono::seconds(600)) { if (std::chrono::system_clock::now() > start + std::chrono::seconds(60)) {
prom.queue_checks_early_exits.Increment(); prom.queue_checks_early_exits.Increment();
break; break;
} }
} }
prom.queue_checks_finished.Increment(); prom.queue_checks_finished.Increment();
lastBuildId = newBuildsByID.empty() ? newLastBuildId : newBuildsByID.begin()->first - 1;
return newBuildsByID.empty(); return newBuildsByID.empty();
} }
@ -416,6 +404,34 @@ void State::processQueueChange(Connection & conn)
} }
std::map<DrvOutput, std::optional<StorePath>> State::getMissingRemotePaths(
ref<Store> destStore,
const std::map<DrvOutput, std::optional<StorePath>> & paths)
{
Sync<std::map<DrvOutput, std::optional<StorePath>>> missing_;
ThreadPool tp;
for (auto & [output, maybeOutputPath] : paths) {
if (!maybeOutputPath) {
auto missing(missing_.lock());
missing->insert({output, maybeOutputPath});
} else {
tp.enqueue([&] {
if (!destStore->isValidPath(*maybeOutputPath)) {
auto missing(missing_.lock());
missing->insert({output, maybeOutputPath});
}
});
}
}
tp.process();
auto missing(missing_.lock());
return *missing;
}
Step::ptr State::createStep(ref<Store> destStore, Step::ptr State::createStep(ref<Store> destStore,
Connection & conn, Build::ptr build, const StorePath & drvPath, Connection & conn, Build::ptr build, const StorePath & drvPath,
Build::ptr referringBuild, Step::ptr referringStep, std::set<StorePath> & finishedDrvs, Build::ptr referringBuild, Step::ptr referringStep, std::set<StorePath> & finishedDrvs,
@ -496,16 +512,15 @@ Step::ptr State::createStep(ref<Store> destStore,
/* Are all outputs valid? */ /* Are all outputs valid? */
auto outputHashes = staticOutputHashes(*localStore, *(step->drv)); auto outputHashes = staticOutputHashes(*localStore, *(step->drv));
bool valid = true; std::map<DrvOutput, std::optional<StorePath>> paths;
std::map<DrvOutput, std::optional<StorePath>> missing;
for (auto & [outputName, maybeOutputPath] : destStore->queryPartialDerivationOutputMap(drvPath, &*localStore)) { for (auto & [outputName, maybeOutputPath] : destStore->queryPartialDerivationOutputMap(drvPath, &*localStore)) {
auto outputHash = outputHashes.at(outputName); auto outputHash = outputHashes.at(outputName);
if (maybeOutputPath && destStore->isValidPath(*maybeOutputPath)) paths.insert({{outputHash, outputName}, maybeOutputPath});
continue;
valid = false;
missing.insert({{outputHash, outputName}, maybeOutputPath});
} }
auto missing = getMissingRemotePaths(destStore, paths);
bool valid = missing.empty();
/* Try to copy the missing paths from the local store or from /* Try to copy the missing paths from the local store or from
substitutes. */ substitutes. */
if (!missing.empty()) { if (!missing.empty()) {

View file

@ -490,7 +490,6 @@ private:
prometheus::Counter& queue_steps_created; prometheus::Counter& queue_steps_created;
prometheus::Counter& queue_checks_early_exits; prometheus::Counter& queue_checks_early_exits;
prometheus::Counter& queue_checks_finished; prometheus::Counter& queue_checks_finished;
prometheus::Gauge& queue_max_id;
prometheus::Counter& dispatcher_time_spent_running; prometheus::Counter& dispatcher_time_spent_running;
prometheus::Counter& dispatcher_time_spent_waiting; prometheus::Counter& dispatcher_time_spent_waiting;
@ -546,8 +545,7 @@ private:
void queueMonitorLoop(Connection & conn); void queueMonitorLoop(Connection & conn);
/* Check the queue for new builds. */ /* Check the queue for new builds. */
bool getQueuedBuilds(Connection & conn, bool getQueuedBuilds(Connection & conn, nix::ref<nix::Store> destStore);
nix::ref<nix::Store> destStore, unsigned int & lastBuildId);
/* Handle cancellation, deletion and priority bumps. */ /* Handle cancellation, deletion and priority bumps. */
void processQueueChange(Connection & conn); void processQueueChange(Connection & conn);
@ -555,6 +553,12 @@ private:
BuildOutput getBuildOutputCached(Connection & conn, nix::ref<nix::Store> destStore, BuildOutput getBuildOutputCached(Connection & conn, nix::ref<nix::Store> destStore,
const nix::StorePath & drvPath); const nix::StorePath & drvPath);
/* Returns paths missing from the remote store. Paths are processed in
* parallel to work around the possible latency of remote stores. */
std::map<nix::DrvOutput, std::optional<nix::StorePath>> getMissingRemotePaths(
nix::ref<nix::Store> destStore,
const std::map<nix::DrvOutput, std::optional<nix::StorePath>> & paths);
Step::ptr createStep(nix::ref<nix::Store> store, Step::ptr createStep(nix::ref<nix::Store> store,
Connection & conn, Build::ptr build, const nix::StorePath & drvPath, Connection & conn, Build::ptr build, const nix::StorePath & drvPath,
Build::ptr referringBuild, Step::ptr referringStep, std::set<nix::StorePath> & finishedDrvs, Build::ptr referringBuild, Step::ptr referringStep, std::set<nix::StorePath> & finishedDrvs,
@ -590,10 +594,11 @@ private:
retried. */ retried. */
enum StepResult { sDone, sRetry, sMaybeCancelled }; enum StepResult { sDone, sRetry, sMaybeCancelled };
StepResult doBuildStep(nix::ref<nix::Store> destStore, StepResult doBuildStep(nix::ref<nix::Store> destStore,
MachineReservation::ptr reservation, MachineReservation::ptr & reservation,
std::shared_ptr<ActiveStep> activeStep); std::shared_ptr<ActiveStep> activeStep);
void buildRemote(nix::ref<nix::Store> destStore, void buildRemote(nix::ref<nix::Store> destStore,
MachineReservation::ptr & reservation,
Machine::ptr machine, Step::ptr step, Machine::ptr machine, Step::ptr step,
const BuildOptions & buildOptions, const BuildOptions & buildOptions,
RemoteResult & result, std::shared_ptr<ActiveStep> activeStep, RemoteResult & result, std::shared_ptr<ActiveStep> activeStep,

View file

@ -236,6 +236,9 @@ sub serveFile {
} }
elsif ($ls->{type} eq "regular") { elsif ($ls->{type} eq "regular") {
# Have the hosted data considered its own origin to avoid being a giant
# XSS hole.
$c->response->header('Content-Security-Policy' => 'sandbox allow-scripts');
$c->stash->{'plain'} = { data => grab(cmd => ["nix", "--experimental-features", "nix-command", $c->stash->{'plain'} = { data => grab(cmd => ["nix", "--experimental-features", "nix-command",
"store", "cat", "--store", getStoreUri(), "$path"]) }; "store", "cat", "--store", getStoreUri(), "$path"]) };

View file

@ -2,7 +2,6 @@
#include <pqxx/pqxx> #include <pqxx/pqxx>
#include "environment-variables.hh"
#include "util.hh" #include "util.hh"
@ -18,7 +17,7 @@ struct Connection : pqxx::connection
std::string lower_prefix = "dbi:Pg:"; std::string lower_prefix = "dbi:Pg:";
std::string upper_prefix = "DBI:Pg:"; std::string upper_prefix = "DBI:Pg:";
if (hasPrefix(s, lower_prefix) || hasPrefix(s, upper_prefix)) { if (s.starts_with(lower_prefix) || s.starts_with(upper_prefix)) {
return concatStringsSep(" ", tokenizeString<Strings>(std::string(s, lower_prefix.size()), ";")); return concatStringsSep(" ", tokenizeString<Strings>(std::string(s, lower_prefix.size()), ";"));
} }

View file

@ -2,7 +2,6 @@
#include <map> #include <map>
#include "file-system.hh"
#include "util.hh" #include "util.hh"
struct HydraConfig struct HydraConfig