Compare commits
10 commits
cc6bafe538
...
a053ef8fdf
Author | SHA1 | Date | |
---|---|---|---|
leo60228 | a053ef8fdf | ||
leo60228 | 803b8ee731 | ||
leo60228 | 249620b49e | ||
Pierre Bourdon | b8d03adaf4 | ||
Pierre Bourdon | ee1a7a7813 | ||
Pierre Bourdon | 5c3e508e55 | ||
Pierre Bourdon | 026e3a3103 | ||
Pierre Bourdon | 6606a7f86e | ||
Pierre Bourdon | f31b95d371 | ||
Pierre Bourdon | 54f8daf6b1 |
81
flake.lock
81
flake.lock
|
@ -16,69 +16,37 @@
|
|||
"type": "github"
|
||||
}
|
||||
},
|
||||
"lowdown-src": {
|
||||
"flake": false,
|
||||
"locked": {
|
||||
"lastModified": 1633514407,
|
||||
"narHash": "sha256-Dw32tiMjdK9t3ETl5fzGrutQTzh2rufgZV4A/BbxuD4=",
|
||||
"owner": "kristapsdz",
|
||||
"repo": "lowdown",
|
||||
"rev": "d2c2b44ff6c27b936ec27358a2653caaef8f73b8",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"owner": "kristapsdz",
|
||||
"repo": "lowdown",
|
||||
"type": "github"
|
||||
}
|
||||
},
|
||||
"nix": {
|
||||
"inputs": {
|
||||
"flake-compat": "flake-compat",
|
||||
"lowdown-src": "lowdown-src",
|
||||
"nixpkgs": [
|
||||
"nixpkgs"
|
||||
],
|
||||
"nixpkgs-regression": "nixpkgs-regression"
|
||||
"nixpkgs-regression": "nixpkgs-regression",
|
||||
"pre-commit-hooks": "pre-commit-hooks"
|
||||
},
|
||||
"locked": {
|
||||
"lastModified": 1706208340,
|
||||
"narHash": "sha256-wNyHUEIiKKVs6UXrUzhP7RSJQv0A8jckgcuylzftl8k=",
|
||||
"owner": "NixOS",
|
||||
"repo": "nix",
|
||||
"rev": "2c4bb93ba5a97e7078896ebc36385ce172960e4e",
|
||||
"type": "github"
|
||||
"lastModified": 1714955862,
|
||||
"narHash": "sha256-REWlo2RYHfJkxnmZTEJu3Cd/2VM+wjjpPy7Xi4BdDTQ=",
|
||||
"ref": "refs/tags/2.90-beta.1",
|
||||
"rev": "b6799ab0374a8e1907a48915d3187e07da41d88c",
|
||||
"revCount": 15501,
|
||||
"type": "git",
|
||||
"url": "https://git@git.lix.systems/lix-project/lix"
|
||||
},
|
||||
"original": {
|
||||
"owner": "NixOS",
|
||||
"ref": "2.19-maintenance",
|
||||
"repo": "nix",
|
||||
"type": "github"
|
||||
"ref": "refs/tags/2.90-beta.1",
|
||||
"type": "git",
|
||||
"url": "https://git@git.lix.systems/lix-project/lix"
|
||||
}
|
||||
},
|
||||
"nixpkgs": {
|
||||
"locked": {
|
||||
"lastModified": 1701615100,
|
||||
"narHash": "sha256-7VI84NGBvlCTduw2aHLVB62NvCiZUlALLqBe5v684Aw=",
|
||||
"lastModified": 1715218190,
|
||||
"narHash": "sha256-R98WOBHkk8wIi103JUVQF3ei3oui4HvoZcz9tYOAwlk=",
|
||||
"owner": "NixOS",
|
||||
"repo": "nixpkgs",
|
||||
"rev": "e9f06adb793d1cca5384907b3b8a4071d5d7cb19",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"owner": "NixOS",
|
||||
"ref": "nixos-23.05",
|
||||
"repo": "nixpkgs",
|
||||
"type": "github"
|
||||
}
|
||||
},
|
||||
"nixpkgs-for-fileset": {
|
||||
"locked": {
|
||||
"lastModified": 1706098335,
|
||||
"narHash": "sha256-r3dWjT8P9/Ah5m5ul4WqIWD8muj5F+/gbCdjiNVBKmU=",
|
||||
"owner": "NixOS",
|
||||
"repo": "nixpkgs",
|
||||
"rev": "a77ab169a83a4175169d78684ddd2e54486ac651",
|
||||
"rev": "9a9960b98418f8c385f52de3b09a63f9c561427a",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
|
@ -104,11 +72,26 @@
|
|||
"type": "github"
|
||||
}
|
||||
},
|
||||
"pre-commit-hooks": {
|
||||
"flake": false,
|
||||
"locked": {
|
||||
"lastModified": 1714478972,
|
||||
"narHash": "sha256-q//cgb52vv81uOuwz1LaXElp3XAe1TqrABXODAEF6Sk=",
|
||||
"owner": "cachix",
|
||||
"repo": "git-hooks.nix",
|
||||
"rev": "2849da033884f54822af194400f8dff435ada242",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"owner": "cachix",
|
||||
"repo": "git-hooks.nix",
|
||||
"type": "github"
|
||||
}
|
||||
},
|
||||
"root": {
|
||||
"inputs": {
|
||||
"nix": "nix",
|
||||
"nixpkgs": "nixpkgs",
|
||||
"nixpkgs-for-fileset": "nixpkgs-for-fileset"
|
||||
"nixpkgs": "nixpkgs"
|
||||
}
|
||||
}
|
||||
},
|
||||
|
|
13
flake.nix
13
flake.nix
|
@ -1,16 +1,11 @@
|
|||
{
|
||||
description = "A Nix-based continuous build system";
|
||||
|
||||
inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixos-23.05";
|
||||
inputs.nix.url = "github:NixOS/nix/2.19-maintenance";
|
||||
inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixos-23.11";
|
||||
inputs.nix.url = "git+https://git@git.lix.systems/lix-project/lix?ref=refs/tags/2.90-beta.1";
|
||||
inputs.nix.inputs.nixpkgs.follows = "nixpkgs";
|
||||
|
||||
# TODO get rid of this once https://github.com/NixOS/nix/pull/9546 is
|
||||
# mered and we upgrade or Nix, so the main `nixpkgs` input is at least
|
||||
# 23.11 and has `lib.fileset`.
|
||||
inputs.nixpkgs-for-fileset.url = "github:NixOS/nixpkgs/nixos-23.11";
|
||||
|
||||
outputs = { self, nixpkgs, nix, nixpkgs-for-fileset }:
|
||||
outputs = { self, nixpkgs, nix }:
|
||||
let
|
||||
systems = [ "x86_64-linux" "aarch64-linux" ];
|
||||
forEachSystem = nixpkgs.lib.genAttrs systems;
|
||||
|
@ -67,7 +62,7 @@
|
|||
};
|
||||
|
||||
hydra = final.callPackage ./package.nix {
|
||||
inherit (nixpkgs-for-fileset.lib) fileset;
|
||||
inherit (final.lib) fileset;
|
||||
rawSrc = self;
|
||||
};
|
||||
};
|
||||
|
|
|
@ -9,7 +9,6 @@
|
|||
#include "eval-inline.hh"
|
||||
#include "eval-settings.hh"
|
||||
#include "signals.hh"
|
||||
#include "terminal.hh"
|
||||
#include "util.hh"
|
||||
#include "get-drvs.hh"
|
||||
#include "globals.hh"
|
||||
|
@ -97,7 +96,7 @@ static std::string queryMetaStrings(EvalState & state, DrvInfo & drv, const std:
|
|||
rec = [&](Value & v) {
|
||||
state.forceValue(v, noPos);
|
||||
if (v.type() == nString)
|
||||
res.emplace_back(v.string_view());
|
||||
res.push_back(v.string.s);
|
||||
else if (v.isList())
|
||||
for (unsigned int n = 0; n < v.listSize(); ++n)
|
||||
rec(*v.listElems()[n]);
|
||||
|
@ -162,7 +161,7 @@ static void worker(
|
|||
|
||||
auto s = readLine(from.get());
|
||||
if (s == "exit") break;
|
||||
if (!hasPrefix(s, "do ")) abort();
|
||||
if (!s.starts_with("do ")) abort();
|
||||
std::string attrPath(s, 3);
|
||||
|
||||
debug("worker process %d at '%s'", getpid(), attrPath);
|
||||
|
@ -185,7 +184,7 @@ static void worker(
|
|||
!experimentalFeatureSettings.isEnabled(Xp::CaDerivations));
|
||||
|
||||
if (drv->querySystem() == "unknown")
|
||||
throw EvalError("derivation must have a 'system' attribute");
|
||||
state.error<EvalError>("derivation must have a 'system' attribute").debugThrow();
|
||||
|
||||
auto drvPath = state.store->printStorePath(drv->requireDrvPath());
|
||||
|
||||
|
@ -208,7 +207,7 @@ static void worker(
|
|||
if (a && state.forceBool(*a->value, a->pos, "while evaluating the `_hydraAggregate` attribute")) {
|
||||
auto a = v->attrs->get(state.symbols.create("constituents"));
|
||||
if (!a)
|
||||
throw EvalError("derivation must have a ‘constituents’ attribute");
|
||||
state.error<EvalError>("derivation must have a ‘constituents’ attribute").debugThrow();
|
||||
|
||||
NixStringContext context;
|
||||
state.coerceToString(a->pos, *a->value, context, "while evaluating the `constituents` attribute", true, false);
|
||||
|
@ -228,7 +227,7 @@ static void worker(
|
|||
auto v = a->value->listElems()[n];
|
||||
state.forceValue(*v, noPos);
|
||||
if (v->type() == nString)
|
||||
job["namedConstituents"].push_back(v->string_view());
|
||||
job["namedConstituents"].push_back(v->str());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -274,7 +273,7 @@ static void worker(
|
|||
else if (v->type() == nNull)
|
||||
;
|
||||
|
||||
else throw TypeError("attribute '%s' is %s, which is not supported", attrPath, showType(*v));
|
||||
else state.error<TypeError>("attribute '%s' is %s, which is not supported", attrPath, showType(*v)).debugThrow();
|
||||
|
||||
} catch (EvalError & e) {
|
||||
auto msg = e.msg();
|
||||
|
@ -381,8 +380,7 @@ int main(int argc, char * * argv)
|
|||
// what's shown in the Hydra UI.
|
||||
writeLine(to->get(), "restart");
|
||||
}
|
||||
},
|
||||
ProcessOptions { .allowVfork = false });
|
||||
});
|
||||
from = std::move(fromPipe.readSide);
|
||||
to = std::move(toPipe.writeSide);
|
||||
debug("created worker process %d", pid);
|
||||
|
@ -533,7 +531,7 @@ int main(int argc, char * * argv)
|
|||
|
||||
if (brokenJobs.empty()) {
|
||||
std::string drvName(drvPath.name());
|
||||
assert(hasSuffix(drvName, drvExtension));
|
||||
assert(drvName.ends_with(drvExtension));
|
||||
drvName.resize(drvName.size() - drvExtension.size());
|
||||
|
||||
auto hashModulo = hashDerivationModulo(*store, drv, true);
|
||||
|
|
|
@ -12,7 +12,10 @@
|
|||
#include <sys/types.h>
|
||||
#include <sys/wait.h>
|
||||
|
||||
#include <boost/format.hpp>
|
||||
|
||||
using namespace nix;
|
||||
using boost::format;
|
||||
|
||||
typedef std::pair<std::string, std::string> JobsetName;
|
||||
|
||||
|
@ -506,7 +509,7 @@ int main(int argc, char * * argv)
|
|||
parseCmdLine(argc, argv, [&](Strings::iterator & arg, const Strings::iterator & end) {
|
||||
if (*arg == "--unlock")
|
||||
unlock = true;
|
||||
else if (hasPrefix(*arg, "-"))
|
||||
else if (arg->starts_with("-"))
|
||||
return false;
|
||||
args.push_back(*arg);
|
||||
return true;
|
||||
|
|
|
@ -9,8 +9,6 @@
|
|||
#include "path.hh"
|
||||
#include "serve-protocol.hh"
|
||||
#include "state.hh"
|
||||
#include "current-process.hh"
|
||||
#include "processes.hh"
|
||||
#include "util.hh"
|
||||
#include "serve-protocol.hh"
|
||||
#include "serve-protocol-impl.hh"
|
||||
|
@ -95,11 +93,11 @@ static void openConnection(::Machine::ptr machine, Path tmpDir, int stderrFD, SS
|
|||
throw SysError("cannot start %s", pgmName);
|
||||
});
|
||||
|
||||
to.readSide = -1;
|
||||
from.writeSide = -1;
|
||||
to.readSide.reset();
|
||||
from.writeSide.reset();
|
||||
|
||||
child.in = to.writeSide.release();
|
||||
child.out = from.readSide.release();
|
||||
child.in = AutoCloseFD{to.writeSide.release()};
|
||||
child.out = AutoCloseFD{from.readSide.release()};
|
||||
|
||||
// XXX: determine the actual max value we can use from /proc.
|
||||
int pipesize = 1024 * 1024;
|
||||
|
@ -188,7 +186,7 @@ static std::pair<Path, AutoCloseFD> openLogFile(const std::string & logDir, cons
|
|||
|
||||
createDirs(dirOf(logFile));
|
||||
|
||||
AutoCloseFD logFD = open(logFile.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0666);
|
||||
AutoCloseFD logFD{open(logFile.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0666)};
|
||||
if (!logFD) throw SysError("creating log file ‘%s’", logFile);
|
||||
|
||||
return {std::move(logFile), std::move(logFD)};
|
||||
|
@ -506,6 +504,7 @@ private:
|
|||
};
|
||||
|
||||
void State::buildRemote(ref<Store> destStore,
|
||||
MachineReservation::ptr & reservation,
|
||||
::Machine::ptr machine, Step::ptr step,
|
||||
const BuildOptions & buildOptions,
|
||||
RemoteResult & result, std::shared_ptr<ActiveStep> activeStep,
|
||||
|
@ -589,7 +588,7 @@ void State::buildRemote(ref<Store> destStore,
|
|||
if (ftruncate(logFD.get(), 0) == -1)
|
||||
throw SysError("truncating log file ‘%s’", result.logFile);
|
||||
|
||||
logFD = -1;
|
||||
logFD.reset();
|
||||
|
||||
/* Do the build. */
|
||||
printMsg(lvlDebug, "building ‘%s’ on ‘%s’",
|
||||
|
@ -630,6 +629,15 @@ void State::buildRemote(ref<Store> destStore,
|
|||
}
|
||||
SemaphoreReleaser releaser(&localWorkThrottler);
|
||||
|
||||
/* Once we've started copying outputs, release the machine reservation
|
||||
* so further builds can happen. We do not release the machine earlier
|
||||
* to avoid situations where the queue runner is bottlenecked on
|
||||
* copying outputs and we end up building too many things that we
|
||||
* haven't been able to allow copy slots for. */
|
||||
assert(reservation.unique());
|
||||
reservation = 0;
|
||||
wakeDispatcher();
|
||||
|
||||
StorePathSet outputs;
|
||||
for (auto & [_, realisation] : buildResult.builtOutputs)
|
||||
outputs.insert(realisation.outPath);
|
||||
|
@ -676,7 +684,7 @@ void State::buildRemote(ref<Store> destStore,
|
|||
}
|
||||
|
||||
/* Shut down the connection. */
|
||||
child.in = -1;
|
||||
child.in.reset();
|
||||
child.sshPid.wait();
|
||||
|
||||
} catch (Error & e) {
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
#include "hydra-build-result.hh"
|
||||
#include "store-api.hh"
|
||||
#include "util.hh"
|
||||
#include "source-accessor.hh"
|
||||
#include "fs-accessor.hh"
|
||||
|
||||
#include <regex>
|
||||
|
||||
|
@ -63,7 +63,7 @@ BuildOutput getBuildOutput(
|
|||
|
||||
auto productsFile = narMembers.find(outputS + "/nix-support/hydra-build-products");
|
||||
if (productsFile == narMembers.end() ||
|
||||
productsFile->second.type != SourceAccessor::Type::tRegular)
|
||||
productsFile->second.type != FSAccessor::Type::tRegular)
|
||||
continue;
|
||||
assert(productsFile->second.contents);
|
||||
|
||||
|
@ -94,7 +94,7 @@ BuildOutput getBuildOutput(
|
|||
|
||||
product.name = product.path == store->printStorePath(output) ? "" : baseNameOf(product.path);
|
||||
|
||||
if (file->second.type == SourceAccessor::Type::tRegular) {
|
||||
if (file->second.type == FSAccessor::Type::tRegular) {
|
||||
product.isRegular = true;
|
||||
product.fileSize = file->second.fileSize.value();
|
||||
product.sha256hash = file->second.sha256.value();
|
||||
|
@ -116,7 +116,7 @@ BuildOutput getBuildOutput(
|
|||
|
||||
auto file = narMembers.find(product.path);
|
||||
assert(file != narMembers.end());
|
||||
if (file->second.type == SourceAccessor::Type::tDirectory)
|
||||
if (file->second.type == FSAccessor::Type::tDirectory)
|
||||
res.products.push_back(product);
|
||||
}
|
||||
}
|
||||
|
@ -125,7 +125,7 @@ BuildOutput getBuildOutput(
|
|||
for (auto & output : outputs) {
|
||||
auto file = narMembers.find(store->printStorePath(output) + "/nix-support/hydra-release-name");
|
||||
if (file == narMembers.end() ||
|
||||
file->second.type != SourceAccessor::Type::tRegular)
|
||||
file->second.type != FSAccessor::Type::tRegular)
|
||||
continue;
|
||||
res.releaseName = trim(file->second.contents.value());
|
||||
// FIXME: validate release name
|
||||
|
@ -135,7 +135,7 @@ BuildOutput getBuildOutput(
|
|||
for (auto & output : outputs) {
|
||||
auto file = narMembers.find(store->printStorePath(output) + "/nix-support/hydra-metrics");
|
||||
if (file == narMembers.end() ||
|
||||
file->second.type != SourceAccessor::Type::tRegular)
|
||||
file->second.type != FSAccessor::Type::tRegular)
|
||||
continue;
|
||||
for (auto & line : tokenizeString<Strings>(file->second.contents.value(), "\n")) {
|
||||
auto fields = tokenizeString<std::vector<std::string>>(line);
|
||||
|
|
|
@ -46,10 +46,12 @@ void State::builder(MachineReservation::ptr reservation)
|
|||
}
|
||||
}
|
||||
|
||||
/* Release the machine and wake up the dispatcher. */
|
||||
/* If the machine hasn't been released yet, release and wake up the dispatcher. */
|
||||
if (reservation) {
|
||||
assert(reservation.unique());
|
||||
reservation = 0;
|
||||
wakeDispatcher();
|
||||
}
|
||||
|
||||
/* If there was a temporary failure, retry the step after an
|
||||
exponentially increasing interval. */
|
||||
|
@ -72,11 +74,11 @@ void State::builder(MachineReservation::ptr reservation)
|
|||
|
||||
|
||||
State::StepResult State::doBuildStep(nix::ref<Store> destStore,
|
||||
MachineReservation::ptr reservation,
|
||||
MachineReservation::ptr & reservation,
|
||||
std::shared_ptr<ActiveStep> activeStep)
|
||||
{
|
||||
auto & step(reservation->step);
|
||||
auto & machine(reservation->machine);
|
||||
auto step(reservation->step);
|
||||
auto machine(reservation->machine);
|
||||
|
||||
{
|
||||
auto step_(step->state.lock());
|
||||
|
@ -208,7 +210,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
|
|||
|
||||
try {
|
||||
/* FIXME: referring builds may have conflicting timeouts. */
|
||||
buildRemote(destStore, machine, step, buildOptions, result, activeStep, updateStep, narMembers);
|
||||
buildRemote(destStore, reservation, machine, step, buildOptions, result, activeStep, updateStep, narMembers);
|
||||
} catch (Error & e) {
|
||||
if (activeStep->state_.lock()->cancelled) {
|
||||
printInfo("marking step %d of build %d as cancelled", stepNr, buildId);
|
||||
|
|
|
@ -133,6 +133,8 @@ system_time State::doDispatch()
|
|||
comparator is a partial ordering (see MachineInfo). */
|
||||
int highestGlobalPriority;
|
||||
int highestLocalPriority;
|
||||
size_t numRequiredSystemFeatures;
|
||||
size_t numRevDeps;
|
||||
BuildID lowestBuildID;
|
||||
|
||||
StepInfo(Step::ptr step, Step::State & step_) : step(step)
|
||||
|
@ -141,6 +143,8 @@ system_time State::doDispatch()
|
|||
lowestShareUsed = std::min(lowestShareUsed, jobset->shareUsed());
|
||||
highestGlobalPriority = step_.highestGlobalPriority;
|
||||
highestLocalPriority = step_.highestLocalPriority;
|
||||
numRequiredSystemFeatures = step->requiredSystemFeatures.size();
|
||||
numRevDeps = step_.rdeps.size();
|
||||
lowestBuildID = step_.lowestBuildID;
|
||||
}
|
||||
};
|
||||
|
@ -193,6 +197,8 @@ system_time State::doDispatch()
|
|||
a.highestGlobalPriority != b.highestGlobalPriority ? a.highestGlobalPriority > b.highestGlobalPriority :
|
||||
a.lowestShareUsed != b.lowestShareUsed ? a.lowestShareUsed < b.lowestShareUsed :
|
||||
a.highestLocalPriority != b.highestLocalPriority ? a.highestLocalPriority > b.highestLocalPriority :
|
||||
a.numRequiredSystemFeatures != b.numRequiredSystemFeatures ? a.numRequiredSystemFeatures > b.numRequiredSystemFeatures :
|
||||
a.numRevDeps != b.numRevDeps ? a.numRevDeps > b.numRevDeps :
|
||||
a.lowestBuildID < b.lowestBuildID;
|
||||
});
|
||||
|
||||
|
|
|
@ -15,7 +15,6 @@
|
|||
#include "state.hh"
|
||||
#include "hydra-build-result.hh"
|
||||
#include "store-api.hh"
|
||||
#include "remote-store.hh"
|
||||
|
||||
#include "globals.hh"
|
||||
#include "hydra-config.hh"
|
||||
|
@ -70,13 +69,6 @@ State::PromMetrics::PromMetrics()
|
|||
.Register(*registry)
|
||||
.Add({})
|
||||
)
|
||||
, queue_max_id(
|
||||
prometheus::BuildGauge()
|
||||
.Name("hydraqueuerunner_queue_max_build_id_info")
|
||||
.Help("Maximum build record ID in the queue")
|
||||
.Register(*registry)
|
||||
.Add({})
|
||||
)
|
||||
, dispatcher_time_spent_running(
|
||||
prometheus::BuildCounter()
|
||||
.Name("hydraqueuerunner_dispatcher_time_spent_running")
|
||||
|
@ -542,7 +534,7 @@ void State::markSucceededBuild(pqxx::work & txn, Build::ptr build,
|
|||
product.type,
|
||||
product.subtype,
|
||||
product.fileSize ? std::make_optional(*product.fileSize) : std::nullopt,
|
||||
product.sha256hash ? std::make_optional(product.sha256hash->to_string(HashFormat::Base16, false)) : std::nullopt,
|
||||
product.sha256hash ? std::make_optional(product.sha256hash->to_string(Base16, false)) : std::nullopt,
|
||||
product.path,
|
||||
product.name,
|
||||
product.defaultPath);
|
||||
|
@ -938,20 +930,6 @@ void State::run(BuildID buildOne)
|
|||
}
|
||||
}).detach();
|
||||
|
||||
/* Make sure that old daemon connections are closed even when
|
||||
we're not doing much. */
|
||||
std::thread([&]() {
|
||||
while (true) {
|
||||
sleep(10);
|
||||
try {
|
||||
if (auto remoteStore = getDestStore().dynamic_pointer_cast<RemoteStore>())
|
||||
remoteStore->flushBadConnections();
|
||||
} catch (std::exception & e) {
|
||||
printMsg(lvlError, "connection flush thread: %s", e.what());
|
||||
}
|
||||
}
|
||||
}).detach();
|
||||
|
||||
/* Monitor the database for status dump requests (e.g. from
|
||||
‘hydra-queue-runner --status’). */
|
||||
while (true) {
|
||||
|
|
|
@ -24,13 +24,13 @@ struct Extractor : ParseSink
|
|||
|
||||
void createDirectory(const Path & path) override
|
||||
{
|
||||
members.insert_or_assign(prefix + path, NarMemberData { .type = SourceAccessor::Type::tDirectory });
|
||||
members.insert_or_assign(prefix + path, NarMemberData { .type = FSAccessor::Type::tDirectory });
|
||||
}
|
||||
|
||||
void createRegularFile(const Path & path) override
|
||||
{
|
||||
curMember = &members.insert_or_assign(prefix + path, NarMemberData {
|
||||
.type = SourceAccessor::Type::tRegular,
|
||||
.type = FSAccessor::Type::tRegular,
|
||||
.fileSize = 0,
|
||||
.contents = filesToKeep.count(path) ? std::optional("") : std::nullopt,
|
||||
}).first->second;
|
||||
|
@ -66,14 +66,8 @@ struct Extractor : ParseSink
|
|||
|
||||
void createSymlink(const Path & path, const std::string & target) override
|
||||
{
|
||||
members.insert_or_assign(prefix + path, NarMemberData { .type = SourceAccessor::Type::tSymlink });
|
||||
members.insert_or_assign(prefix + path, NarMemberData { .type = FSAccessor::Type::tSymlink });
|
||||
}
|
||||
|
||||
void isExecutable() override
|
||||
{ }
|
||||
|
||||
void closeRegularFile() override
|
||||
{ }
|
||||
};
|
||||
|
||||
|
||||
|
|
|
@ -1,13 +1,13 @@
|
|||
#pragma once
|
||||
|
||||
#include "source-accessor.hh"
|
||||
#include "fs-accessor.hh"
|
||||
#include "types.hh"
|
||||
#include "serialise.hh"
|
||||
#include "hash.hh"
|
||||
|
||||
struct NarMemberData
|
||||
{
|
||||
nix::SourceAccessor::Type type;
|
||||
nix::FSAccessor::Type type;
|
||||
std::optional<uint64_t> fileSize;
|
||||
std::optional<std::string> contents;
|
||||
std::optional<nix::Hash> sha256;
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
#include "state.hh"
|
||||
#include "hydra-build-result.hh"
|
||||
#include "globals.hh"
|
||||
#include "thread-pool.hh"
|
||||
|
||||
#include <cstring>
|
||||
|
||||
|
@ -37,15 +38,13 @@ void State::queueMonitorLoop(Connection & conn)
|
|||
|
||||
auto destStore = getDestStore();
|
||||
|
||||
unsigned int lastBuildId = 0;
|
||||
|
||||
bool quit = false;
|
||||
while (!quit) {
|
||||
auto t_before_work = std::chrono::steady_clock::now();
|
||||
|
||||
localStore->clearPathInfoCache();
|
||||
|
||||
bool done = getQueuedBuilds(conn, destStore, lastBuildId);
|
||||
bool done = getQueuedBuilds(conn, destStore);
|
||||
|
||||
if (buildOne && buildOneDone) quit = true;
|
||||
|
||||
|
@ -63,12 +62,10 @@ void State::queueMonitorLoop(Connection & conn)
|
|||
conn.get_notifs();
|
||||
|
||||
if (auto lowestId = buildsAdded.get()) {
|
||||
lastBuildId = std::min(lastBuildId, static_cast<unsigned>(std::stoul(*lowestId) - 1));
|
||||
printMsg(lvlTalkative, "got notification: new builds added to the queue");
|
||||
}
|
||||
if (buildsRestarted.get()) {
|
||||
printMsg(lvlTalkative, "got notification: builds restarted");
|
||||
lastBuildId = 0; // check all builds
|
||||
}
|
||||
if (buildsCancelled.get() || buildsDeleted.get() || buildsBumped.get()) {
|
||||
printMsg(lvlTalkative, "got notification: builds cancelled or bumped");
|
||||
|
@ -95,20 +92,18 @@ struct PreviousFailure : public std::exception {
|
|||
|
||||
|
||||
bool State::getQueuedBuilds(Connection & conn,
|
||||
ref<Store> destStore, unsigned int & lastBuildId)
|
||||
ref<Store> destStore)
|
||||
{
|
||||
prom.queue_checks_started.Increment();
|
||||
|
||||
printInfo("checking the queue for builds > %d...", lastBuildId);
|
||||
printInfo("checking the queue for builds...");
|
||||
|
||||
/* Grab the queued builds from the database, but don't process
|
||||
them yet (since we don't want a long-running transaction). */
|
||||
std::vector<BuildID> newIDs;
|
||||
std::map<BuildID, Build::ptr> newBuildsByID;
|
||||
std::unordered_map<BuildID, Build::ptr> newBuildsByID;
|
||||
std::multimap<StorePath, BuildID> newBuildsByPath;
|
||||
|
||||
unsigned int newLastBuildId = lastBuildId;
|
||||
|
||||
{
|
||||
pqxx::work txn(conn);
|
||||
|
||||
|
@ -117,17 +112,12 @@ bool State::getQueuedBuilds(Connection & conn,
|
|||
"jobsets.name as jobset, job, drvPath, maxsilent, timeout, timestamp, "
|
||||
"globalPriority, priority from Builds "
|
||||
"inner join jobsets on builds.jobset_id = jobsets.id "
|
||||
"where builds.id > $1 and finished = 0 order by globalPriority desc, builds.id",
|
||||
lastBuildId);
|
||||
"where finished = 0 order by globalPriority desc, random()");
|
||||
|
||||
for (auto const & row : res) {
|
||||
auto builds_(builds.lock());
|
||||
BuildID id = row["id"].as<BuildID>();
|
||||
if (buildOne && id != buildOne) continue;
|
||||
if (id > newLastBuildId) {
|
||||
newLastBuildId = id;
|
||||
prom.queue_max_id.Set(id);
|
||||
}
|
||||
if (builds_->count(id)) continue;
|
||||
|
||||
auto build = std::make_shared<Build>(
|
||||
|
@ -309,7 +299,7 @@ bool State::getQueuedBuilds(Connection & conn,
|
|||
try {
|
||||
createBuild(build);
|
||||
} catch (Error & e) {
|
||||
e.addTrace({}, hintfmt("while loading build %d: ", build->id));
|
||||
e.addTrace({}, HintFmt("while loading build %d: ", build->id));
|
||||
throw;
|
||||
}
|
||||
|
||||
|
@ -329,15 +319,13 @@ bool State::getQueuedBuilds(Connection & conn,
|
|||
|
||||
/* Stop after a certain time to allow priority bumps to be
|
||||
processed. */
|
||||
if (std::chrono::system_clock::now() > start + std::chrono::seconds(600)) {
|
||||
if (std::chrono::system_clock::now() > start + std::chrono::seconds(60)) {
|
||||
prom.queue_checks_early_exits.Increment();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
prom.queue_checks_finished.Increment();
|
||||
|
||||
lastBuildId = newBuildsByID.empty() ? newLastBuildId : newBuildsByID.begin()->first - 1;
|
||||
return newBuildsByID.empty();
|
||||
}
|
||||
|
||||
|
@ -416,6 +404,34 @@ void State::processQueueChange(Connection & conn)
|
|||
}
|
||||
|
||||
|
||||
std::map<DrvOutput, std::optional<StorePath>> State::getMissingRemotePaths(
|
||||
ref<Store> destStore,
|
||||
const std::map<DrvOutput, std::optional<StorePath>> & paths)
|
||||
{
|
||||
Sync<std::map<DrvOutput, std::optional<StorePath>>> missing_;
|
||||
ThreadPool tp;
|
||||
|
||||
for (auto & [output, maybeOutputPath] : paths) {
|
||||
if (!maybeOutputPath) {
|
||||
auto missing(missing_.lock());
|
||||
missing->insert({output, maybeOutputPath});
|
||||
} else {
|
||||
tp.enqueue([&] {
|
||||
if (!destStore->isValidPath(*maybeOutputPath)) {
|
||||
auto missing(missing_.lock());
|
||||
missing->insert({output, maybeOutputPath});
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
tp.process();
|
||||
|
||||
auto missing(missing_.lock());
|
||||
return *missing;
|
||||
}
|
||||
|
||||
|
||||
Step::ptr State::createStep(ref<Store> destStore,
|
||||
Connection & conn, Build::ptr build, const StorePath & drvPath,
|
||||
Build::ptr referringBuild, Step::ptr referringStep, std::set<StorePath> & finishedDrvs,
|
||||
|
@ -496,16 +512,15 @@ Step::ptr State::createStep(ref<Store> destStore,
|
|||
|
||||
/* Are all outputs valid? */
|
||||
auto outputHashes = staticOutputHashes(*localStore, *(step->drv));
|
||||
bool valid = true;
|
||||
std::map<DrvOutput, std::optional<StorePath>> missing;
|
||||
std::map<DrvOutput, std::optional<StorePath>> paths;
|
||||
for (auto & [outputName, maybeOutputPath] : destStore->queryPartialDerivationOutputMap(drvPath, &*localStore)) {
|
||||
auto outputHash = outputHashes.at(outputName);
|
||||
if (maybeOutputPath && destStore->isValidPath(*maybeOutputPath))
|
||||
continue;
|
||||
valid = false;
|
||||
missing.insert({{outputHash, outputName}, maybeOutputPath});
|
||||
paths.insert({{outputHash, outputName}, maybeOutputPath});
|
||||
}
|
||||
|
||||
auto missing = getMissingRemotePaths(destStore, paths);
|
||||
bool valid = missing.empty();
|
||||
|
||||
/* Try to copy the missing paths from the local store or from
|
||||
substitutes. */
|
||||
if (!missing.empty()) {
|
||||
|
|
|
@ -490,7 +490,6 @@ private:
|
|||
prometheus::Counter& queue_steps_created;
|
||||
prometheus::Counter& queue_checks_early_exits;
|
||||
prometheus::Counter& queue_checks_finished;
|
||||
prometheus::Gauge& queue_max_id;
|
||||
|
||||
prometheus::Counter& dispatcher_time_spent_running;
|
||||
prometheus::Counter& dispatcher_time_spent_waiting;
|
||||
|
@ -546,8 +545,7 @@ private:
|
|||
void queueMonitorLoop(Connection & conn);
|
||||
|
||||
/* Check the queue for new builds. */
|
||||
bool getQueuedBuilds(Connection & conn,
|
||||
nix::ref<nix::Store> destStore, unsigned int & lastBuildId);
|
||||
bool getQueuedBuilds(Connection & conn, nix::ref<nix::Store> destStore);
|
||||
|
||||
/* Handle cancellation, deletion and priority bumps. */
|
||||
void processQueueChange(Connection & conn);
|
||||
|
@ -555,6 +553,12 @@ private:
|
|||
BuildOutput getBuildOutputCached(Connection & conn, nix::ref<nix::Store> destStore,
|
||||
const nix::StorePath & drvPath);
|
||||
|
||||
/* Returns paths missing from the remote store. Paths are processed in
|
||||
* parallel to work around the possible latency of remote stores. */
|
||||
std::map<nix::DrvOutput, std::optional<nix::StorePath>> getMissingRemotePaths(
|
||||
nix::ref<nix::Store> destStore,
|
||||
const std::map<nix::DrvOutput, std::optional<nix::StorePath>> & paths);
|
||||
|
||||
Step::ptr createStep(nix::ref<nix::Store> store,
|
||||
Connection & conn, Build::ptr build, const nix::StorePath & drvPath,
|
||||
Build::ptr referringBuild, Step::ptr referringStep, std::set<nix::StorePath> & finishedDrvs,
|
||||
|
@ -590,10 +594,11 @@ private:
|
|||
retried. */
|
||||
enum StepResult { sDone, sRetry, sMaybeCancelled };
|
||||
StepResult doBuildStep(nix::ref<nix::Store> destStore,
|
||||
MachineReservation::ptr reservation,
|
||||
MachineReservation::ptr & reservation,
|
||||
std::shared_ptr<ActiveStep> activeStep);
|
||||
|
||||
void buildRemote(nix::ref<nix::Store> destStore,
|
||||
MachineReservation::ptr & reservation,
|
||||
Machine::ptr machine, Step::ptr step,
|
||||
const BuildOptions & buildOptions,
|
||||
RemoteResult & result, std::shared_ptr<ActiveStep> activeStep,
|
||||
|
|
|
@ -236,6 +236,9 @@ sub serveFile {
|
|||
}
|
||||
|
||||
elsif ($ls->{type} eq "regular") {
|
||||
# Have the hosted data considered its own origin to avoid being a giant
|
||||
# XSS hole.
|
||||
$c->response->header('Content-Security-Policy' => 'sandbox allow-scripts');
|
||||
|
||||
$c->stash->{'plain'} = { data => grab(cmd => ["nix", "--experimental-features", "nix-command",
|
||||
"store", "cat", "--store", getStoreUri(), "$path"]) };
|
||||
|
|
|
@ -2,7 +2,6 @@
|
|||
|
||||
#include <pqxx/pqxx>
|
||||
|
||||
#include "environment-variables.hh"
|
||||
#include "util.hh"
|
||||
|
||||
|
||||
|
@ -18,7 +17,7 @@ struct Connection : pqxx::connection
|
|||
std::string lower_prefix = "dbi:Pg:";
|
||||
std::string upper_prefix = "DBI:Pg:";
|
||||
|
||||
if (hasPrefix(s, lower_prefix) || hasPrefix(s, upper_prefix)) {
|
||||
if (s.starts_with(lower_prefix) || s.starts_with(upper_prefix)) {
|
||||
return concatStringsSep(" ", tokenizeString<Strings>(std::string(s, lower_prefix.size()), ";"));
|
||||
}
|
||||
|
||||
|
|
|
@ -2,7 +2,6 @@
|
|||
|
||||
#include <map>
|
||||
|
||||
#include "file-system.hh"
|
||||
#include "util.hh"
|
||||
|
||||
struct HydraConfig
|
||||
|
|
Loading…
Reference in a new issue