Compare commits

...

10 commits

Author SHA1 Message Date
leo60228 a053ef8fdf
lix api changes
Some checks are pending
Test / tests (push) Waiting to run
2024-05-10 15:00:54 -04:00
leo60228 803b8ee731
Revert "Update to Nix 2.19"
This reverts commit c922e73c11.
2024-05-10 14:47:11 -04:00
leo60228 249620b49e
use lix 2024-05-10 12:49:27 -04:00
Pierre Bourdon b8d03adaf4
queue runner: attempt at slightly smarter scheduling criteria
Instead of just going for "whatever is the oldest build we know of",
use the following first:

- Is the step more constrained? If so, schedule it first to avoid
  filling up "more desirable" build slots with less constrained builds.

- Does the step have more dependents? If so, schedule it first to try
  and maximize open parallelism and breadth of scheduling options.
2024-04-21 17:36:16 +02:00
Pierre Bourdon ee1a7a7813
web: serveFile: also serve a CSP putting served HTML in its own origin 2024-04-21 16:14:24 +02:00
Pierre Bourdon 5c3e508e55
queue-runner: release machine reservation while copying outputs
This allows for better builder usage when the queue runner is busy. To
avoid running into uncontrollable imbalances between builder/queue
runner, we only release the machine reservation after the local
throttler has found a slot to start copying the outputs for that build.
2024-04-21 01:55:19 +02:00
Pierre Bourdon 026e3a3103
queue-runner: switch to pseudorandom ordering of builds processing
We don't rely on sequential / monotonic build IDs processing anymore, so
randomizing actually has the advantage of mixing builds for different
systems together, to avoid only one chunk of builds for a single system
getting processed while builders for other systems are starved.
2024-04-20 23:05:26 +02:00
Pierre Bourdon 6606a7f86e
queue runner: introduce some parallelism for remote paths lookup
Each output for a given step being ingested is looked up in parallel,
which should basically multiply the speed of builds ingestion by the
average number of outputs per derivation.
2024-04-20 22:28:18 +02:00
Pierre Bourdon f31b95d371
queue-runner: reduce the time between queue monitor restarts
This will induce more DB queries (though these are fairly cheap), but at
the benefit of processing bumps within 1m instead of within 10m.
2024-04-20 16:58:10 +02:00
Pierre Bourdon 54f8daf6b1
queue-runner: remove id > X from new builds query
Running the query with/without it shows that it makes no difference to
postgres, since there's an index on finished=0 already. This allows a
few simplifications, but also paves the way towards running multiple
parallel monitor threads in the future.
2024-04-20 16:53:52 +02:00
16 changed files with 147 additions and 159 deletions

View file

@ -16,69 +16,37 @@
"type": "github"
}
},
"lowdown-src": {
"flake": false,
"locked": {
"lastModified": 1633514407,
"narHash": "sha256-Dw32tiMjdK9t3ETl5fzGrutQTzh2rufgZV4A/BbxuD4=",
"owner": "kristapsdz",
"repo": "lowdown",
"rev": "d2c2b44ff6c27b936ec27358a2653caaef8f73b8",
"type": "github"
},
"original": {
"owner": "kristapsdz",
"repo": "lowdown",
"type": "github"
}
},
"nix": {
"inputs": {
"flake-compat": "flake-compat",
"lowdown-src": "lowdown-src",
"nixpkgs": [
"nixpkgs"
],
"nixpkgs-regression": "nixpkgs-regression"
"nixpkgs-regression": "nixpkgs-regression",
"pre-commit-hooks": "pre-commit-hooks"
},
"locked": {
"lastModified": 1706208340,
"narHash": "sha256-wNyHUEIiKKVs6UXrUzhP7RSJQv0A8jckgcuylzftl8k=",
"owner": "NixOS",
"repo": "nix",
"rev": "2c4bb93ba5a97e7078896ebc36385ce172960e4e",
"type": "github"
"lastModified": 1714955862,
"narHash": "sha256-REWlo2RYHfJkxnmZTEJu3Cd/2VM+wjjpPy7Xi4BdDTQ=",
"ref": "refs/tags/2.90-beta.1",
"rev": "b6799ab0374a8e1907a48915d3187e07da41d88c",
"revCount": 15501,
"type": "git",
"url": "https://git@git.lix.systems/lix-project/lix"
},
"original": {
"owner": "NixOS",
"ref": "2.19-maintenance",
"repo": "nix",
"type": "github"
"ref": "refs/tags/2.90-beta.1",
"type": "git",
"url": "https://git@git.lix.systems/lix-project/lix"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1701615100,
"narHash": "sha256-7VI84NGBvlCTduw2aHLVB62NvCiZUlALLqBe5v684Aw=",
"lastModified": 1715218190,
"narHash": "sha256-R98WOBHkk8wIi103JUVQF3ei3oui4HvoZcz9tYOAwlk=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "e9f06adb793d1cca5384907b3b8a4071d5d7cb19",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-23.05",
"repo": "nixpkgs",
"type": "github"
}
},
"nixpkgs-for-fileset": {
"locked": {
"lastModified": 1706098335,
"narHash": "sha256-r3dWjT8P9/Ah5m5ul4WqIWD8muj5F+/gbCdjiNVBKmU=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "a77ab169a83a4175169d78684ddd2e54486ac651",
"rev": "9a9960b98418f8c385f52de3b09a63f9c561427a",
"type": "github"
},
"original": {
@ -104,11 +72,26 @@
"type": "github"
}
},
"pre-commit-hooks": {
"flake": false,
"locked": {
"lastModified": 1714478972,
"narHash": "sha256-q//cgb52vv81uOuwz1LaXElp3XAe1TqrABXODAEF6Sk=",
"owner": "cachix",
"repo": "git-hooks.nix",
"rev": "2849da033884f54822af194400f8dff435ada242",
"type": "github"
},
"original": {
"owner": "cachix",
"repo": "git-hooks.nix",
"type": "github"
}
},
"root": {
"inputs": {
"nix": "nix",
"nixpkgs": "nixpkgs",
"nixpkgs-for-fileset": "nixpkgs-for-fileset"
"nixpkgs": "nixpkgs"
}
}
},

View file

@ -1,16 +1,11 @@
{
description = "A Nix-based continuous build system";
inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixos-23.05";
inputs.nix.url = "github:NixOS/nix/2.19-maintenance";
inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixos-23.11";
inputs.nix.url = "git+https://git@git.lix.systems/lix-project/lix?ref=refs/tags/2.90-beta.1";
inputs.nix.inputs.nixpkgs.follows = "nixpkgs";
# TODO get rid of this once https://github.com/NixOS/nix/pull/9546 is
# mered and we upgrade or Nix, so the main `nixpkgs` input is at least
# 23.11 and has `lib.fileset`.
inputs.nixpkgs-for-fileset.url = "github:NixOS/nixpkgs/nixos-23.11";
outputs = { self, nixpkgs, nix, nixpkgs-for-fileset }:
outputs = { self, nixpkgs, nix }:
let
systems = [ "x86_64-linux" "aarch64-linux" ];
forEachSystem = nixpkgs.lib.genAttrs systems;
@ -67,7 +62,7 @@
};
hydra = final.callPackage ./package.nix {
inherit (nixpkgs-for-fileset.lib) fileset;
inherit (final.lib) fileset;
rawSrc = self;
};
};

View file

@ -9,7 +9,6 @@
#include "eval-inline.hh"
#include "eval-settings.hh"
#include "signals.hh"
#include "terminal.hh"
#include "util.hh"
#include "get-drvs.hh"
#include "globals.hh"
@ -97,7 +96,7 @@ static std::string queryMetaStrings(EvalState & state, DrvInfo & drv, const std:
rec = [&](Value & v) {
state.forceValue(v, noPos);
if (v.type() == nString)
res.emplace_back(v.string_view());
res.push_back(v.string.s);
else if (v.isList())
for (unsigned int n = 0; n < v.listSize(); ++n)
rec(*v.listElems()[n]);
@ -162,7 +161,7 @@ static void worker(
auto s = readLine(from.get());
if (s == "exit") break;
if (!hasPrefix(s, "do ")) abort();
if (!s.starts_with("do ")) abort();
std::string attrPath(s, 3);
debug("worker process %d at '%s'", getpid(), attrPath);
@ -185,7 +184,7 @@ static void worker(
!experimentalFeatureSettings.isEnabled(Xp::CaDerivations));
if (drv->querySystem() == "unknown")
throw EvalError("derivation must have a 'system' attribute");
state.error<EvalError>("derivation must have a 'system' attribute").debugThrow();
auto drvPath = state.store->printStorePath(drv->requireDrvPath());
@ -208,7 +207,7 @@ static void worker(
if (a && state.forceBool(*a->value, a->pos, "while evaluating the `_hydraAggregate` attribute")) {
auto a = v->attrs->get(state.symbols.create("constituents"));
if (!a)
throw EvalError("derivation must have a constituents attribute");
state.error<EvalError>("derivation must have a constituents attribute").debugThrow();
NixStringContext context;
state.coerceToString(a->pos, *a->value, context, "while evaluating the `constituents` attribute", true, false);
@ -228,7 +227,7 @@ static void worker(
auto v = a->value->listElems()[n];
state.forceValue(*v, noPos);
if (v->type() == nString)
job["namedConstituents"].push_back(v->string_view());
job["namedConstituents"].push_back(v->str());
}
}
@ -274,7 +273,7 @@ static void worker(
else if (v->type() == nNull)
;
else throw TypeError("attribute '%s' is %s, which is not supported", attrPath, showType(*v));
else state.error<TypeError>("attribute '%s' is %s, which is not supported", attrPath, showType(*v)).debugThrow();
} catch (EvalError & e) {
auto msg = e.msg();
@ -381,8 +380,7 @@ int main(int argc, char * * argv)
// what's shown in the Hydra UI.
writeLine(to->get(), "restart");
}
},
ProcessOptions { .allowVfork = false });
});
from = std::move(fromPipe.readSide);
to = std::move(toPipe.writeSide);
debug("created worker process %d", pid);
@ -533,7 +531,7 @@ int main(int argc, char * * argv)
if (brokenJobs.empty()) {
std::string drvName(drvPath.name());
assert(hasSuffix(drvName, drvExtension));
assert(drvName.ends_with(drvExtension));
drvName.resize(drvName.size() - drvExtension.size());
auto hashModulo = hashDerivationModulo(*store, drv, true);

View file

@ -12,7 +12,10 @@
#include <sys/types.h>
#include <sys/wait.h>
#include <boost/format.hpp>
using namespace nix;
using boost::format;
typedef std::pair<std::string, std::string> JobsetName;
@ -506,7 +509,7 @@ int main(int argc, char * * argv)
parseCmdLine(argc, argv, [&](Strings::iterator & arg, const Strings::iterator & end) {
if (*arg == "--unlock")
unlock = true;
else if (hasPrefix(*arg, "-"))
else if (arg->starts_with("-"))
return false;
args.push_back(*arg);
return true;

View file

@ -9,8 +9,6 @@
#include "path.hh"
#include "serve-protocol.hh"
#include "state.hh"
#include "current-process.hh"
#include "processes.hh"
#include "util.hh"
#include "serve-protocol.hh"
#include "serve-protocol-impl.hh"
@ -95,11 +93,11 @@ static void openConnection(::Machine::ptr machine, Path tmpDir, int stderrFD, SS
throw SysError("cannot start %s", pgmName);
});
to.readSide = -1;
from.writeSide = -1;
to.readSide.reset();
from.writeSide.reset();
child.in = to.writeSide.release();
child.out = from.readSide.release();
child.in = AutoCloseFD{to.writeSide.release()};
child.out = AutoCloseFD{from.readSide.release()};
// XXX: determine the actual max value we can use from /proc.
int pipesize = 1024 * 1024;
@ -188,7 +186,7 @@ static std::pair<Path, AutoCloseFD> openLogFile(const std::string & logDir, cons
createDirs(dirOf(logFile));
AutoCloseFD logFD = open(logFile.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0666);
AutoCloseFD logFD{open(logFile.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0666)};
if (!logFD) throw SysError("creating log file %s", logFile);
return {std::move(logFile), std::move(logFD)};
@ -506,6 +504,7 @@ private:
};
void State::buildRemote(ref<Store> destStore,
MachineReservation::ptr & reservation,
::Machine::ptr machine, Step::ptr step,
const BuildOptions & buildOptions,
RemoteResult & result, std::shared_ptr<ActiveStep> activeStep,
@ -589,7 +588,7 @@ void State::buildRemote(ref<Store> destStore,
if (ftruncate(logFD.get(), 0) == -1)
throw SysError("truncating log file %s", result.logFile);
logFD = -1;
logFD.reset();
/* Do the build. */
printMsg(lvlDebug, "building %s on %s",
@ -630,6 +629,15 @@ void State::buildRemote(ref<Store> destStore,
}
SemaphoreReleaser releaser(&localWorkThrottler);
/* Once we've started copying outputs, release the machine reservation
* so further builds can happen. We do not release the machine earlier
* to avoid situations where the queue runner is bottlenecked on
* copying outputs and we end up building too many things that we
* haven't been able to allow copy slots for. */
assert(reservation.unique());
reservation = 0;
wakeDispatcher();
StorePathSet outputs;
for (auto & [_, realisation] : buildResult.builtOutputs)
outputs.insert(realisation.outPath);
@ -676,7 +684,7 @@ void State::buildRemote(ref<Store> destStore,
}
/* Shut down the connection. */
child.in = -1;
child.in.reset();
child.sshPid.wait();
} catch (Error & e) {

View file

@ -1,7 +1,7 @@
#include "hydra-build-result.hh"
#include "store-api.hh"
#include "util.hh"
#include "source-accessor.hh"
#include "fs-accessor.hh"
#include <regex>
@ -63,7 +63,7 @@ BuildOutput getBuildOutput(
auto productsFile = narMembers.find(outputS + "/nix-support/hydra-build-products");
if (productsFile == narMembers.end() ||
productsFile->second.type != SourceAccessor::Type::tRegular)
productsFile->second.type != FSAccessor::Type::tRegular)
continue;
assert(productsFile->second.contents);
@ -94,7 +94,7 @@ BuildOutput getBuildOutput(
product.name = product.path == store->printStorePath(output) ? "" : baseNameOf(product.path);
if (file->second.type == SourceAccessor::Type::tRegular) {
if (file->second.type == FSAccessor::Type::tRegular) {
product.isRegular = true;
product.fileSize = file->second.fileSize.value();
product.sha256hash = file->second.sha256.value();
@ -116,7 +116,7 @@ BuildOutput getBuildOutput(
auto file = narMembers.find(product.path);
assert(file != narMembers.end());
if (file->second.type == SourceAccessor::Type::tDirectory)
if (file->second.type == FSAccessor::Type::tDirectory)
res.products.push_back(product);
}
}
@ -125,7 +125,7 @@ BuildOutput getBuildOutput(
for (auto & output : outputs) {
auto file = narMembers.find(store->printStorePath(output) + "/nix-support/hydra-release-name");
if (file == narMembers.end() ||
file->second.type != SourceAccessor::Type::tRegular)
file->second.type != FSAccessor::Type::tRegular)
continue;
res.releaseName = trim(file->second.contents.value());
// FIXME: validate release name
@ -135,7 +135,7 @@ BuildOutput getBuildOutput(
for (auto & output : outputs) {
auto file = narMembers.find(store->printStorePath(output) + "/nix-support/hydra-metrics");
if (file == narMembers.end() ||
file->second.type != SourceAccessor::Type::tRegular)
file->second.type != FSAccessor::Type::tRegular)
continue;
for (auto & line : tokenizeString<Strings>(file->second.contents.value(), "\n")) {
auto fields = tokenizeString<std::vector<std::string>>(line);

View file

@ -46,10 +46,12 @@ void State::builder(MachineReservation::ptr reservation)
}
}
/* Release the machine and wake up the dispatcher. */
/* If the machine hasn't been released yet, release and wake up the dispatcher. */
if (reservation) {
assert(reservation.unique());
reservation = 0;
wakeDispatcher();
}
/* If there was a temporary failure, retry the step after an
exponentially increasing interval. */
@ -72,11 +74,11 @@ void State::builder(MachineReservation::ptr reservation)
State::StepResult State::doBuildStep(nix::ref<Store> destStore,
MachineReservation::ptr reservation,
MachineReservation::ptr & reservation,
std::shared_ptr<ActiveStep> activeStep)
{
auto & step(reservation->step);
auto & machine(reservation->machine);
auto step(reservation->step);
auto machine(reservation->machine);
{
auto step_(step->state.lock());
@ -208,7 +210,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
try {
/* FIXME: referring builds may have conflicting timeouts. */
buildRemote(destStore, machine, step, buildOptions, result, activeStep, updateStep, narMembers);
buildRemote(destStore, reservation, machine, step, buildOptions, result, activeStep, updateStep, narMembers);
} catch (Error & e) {
if (activeStep->state_.lock()->cancelled) {
printInfo("marking step %d of build %d as cancelled", stepNr, buildId);

View file

@ -133,6 +133,8 @@ system_time State::doDispatch()
comparator is a partial ordering (see MachineInfo). */
int highestGlobalPriority;
int highestLocalPriority;
size_t numRequiredSystemFeatures;
size_t numRevDeps;
BuildID lowestBuildID;
StepInfo(Step::ptr step, Step::State & step_) : step(step)
@ -141,6 +143,8 @@ system_time State::doDispatch()
lowestShareUsed = std::min(lowestShareUsed, jobset->shareUsed());
highestGlobalPriority = step_.highestGlobalPriority;
highestLocalPriority = step_.highestLocalPriority;
numRequiredSystemFeatures = step->requiredSystemFeatures.size();
numRevDeps = step_.rdeps.size();
lowestBuildID = step_.lowestBuildID;
}
};
@ -193,6 +197,8 @@ system_time State::doDispatch()
a.highestGlobalPriority != b.highestGlobalPriority ? a.highestGlobalPriority > b.highestGlobalPriority :
a.lowestShareUsed != b.lowestShareUsed ? a.lowestShareUsed < b.lowestShareUsed :
a.highestLocalPriority != b.highestLocalPriority ? a.highestLocalPriority > b.highestLocalPriority :
a.numRequiredSystemFeatures != b.numRequiredSystemFeatures ? a.numRequiredSystemFeatures > b.numRequiredSystemFeatures :
a.numRevDeps != b.numRevDeps ? a.numRevDeps > b.numRevDeps :
a.lowestBuildID < b.lowestBuildID;
});

View file

@ -15,7 +15,6 @@
#include "state.hh"
#include "hydra-build-result.hh"
#include "store-api.hh"
#include "remote-store.hh"
#include "globals.hh"
#include "hydra-config.hh"
@ -70,13 +69,6 @@ State::PromMetrics::PromMetrics()
.Register(*registry)
.Add({})
)
, queue_max_id(
prometheus::BuildGauge()
.Name("hydraqueuerunner_queue_max_build_id_info")
.Help("Maximum build record ID in the queue")
.Register(*registry)
.Add({})
)
, dispatcher_time_spent_running(
prometheus::BuildCounter()
.Name("hydraqueuerunner_dispatcher_time_spent_running")
@ -542,7 +534,7 @@ void State::markSucceededBuild(pqxx::work & txn, Build::ptr build,
product.type,
product.subtype,
product.fileSize ? std::make_optional(*product.fileSize) : std::nullopt,
product.sha256hash ? std::make_optional(product.sha256hash->to_string(HashFormat::Base16, false)) : std::nullopt,
product.sha256hash ? std::make_optional(product.sha256hash->to_string(Base16, false)) : std::nullopt,
product.path,
product.name,
product.defaultPath);
@ -938,20 +930,6 @@ void State::run(BuildID buildOne)
}
}).detach();
/* Make sure that old daemon connections are closed even when
we're not doing much. */
std::thread([&]() {
while (true) {
sleep(10);
try {
if (auto remoteStore = getDestStore().dynamic_pointer_cast<RemoteStore>())
remoteStore->flushBadConnections();
} catch (std::exception & e) {
printMsg(lvlError, "connection flush thread: %s", e.what());
}
}
}).detach();
/* Monitor the database for status dump requests (e.g. from
hydra-queue-runner --status). */
while (true) {

View file

@ -24,13 +24,13 @@ struct Extractor : ParseSink
void createDirectory(const Path & path) override
{
members.insert_or_assign(prefix + path, NarMemberData { .type = SourceAccessor::Type::tDirectory });
members.insert_or_assign(prefix + path, NarMemberData { .type = FSAccessor::Type::tDirectory });
}
void createRegularFile(const Path & path) override
{
curMember = &members.insert_or_assign(prefix + path, NarMemberData {
.type = SourceAccessor::Type::tRegular,
.type = FSAccessor::Type::tRegular,
.fileSize = 0,
.contents = filesToKeep.count(path) ? std::optional("") : std::nullopt,
}).first->second;
@ -66,14 +66,8 @@ struct Extractor : ParseSink
void createSymlink(const Path & path, const std::string & target) override
{
members.insert_or_assign(prefix + path, NarMemberData { .type = SourceAccessor::Type::tSymlink });
members.insert_or_assign(prefix + path, NarMemberData { .type = FSAccessor::Type::tSymlink });
}
void isExecutable() override
{ }
void closeRegularFile() override
{ }
};

View file

@ -1,13 +1,13 @@
#pragma once
#include "source-accessor.hh"
#include "fs-accessor.hh"
#include "types.hh"
#include "serialise.hh"
#include "hash.hh"
struct NarMemberData
{
nix::SourceAccessor::Type type;
nix::FSAccessor::Type type;
std::optional<uint64_t> fileSize;
std::optional<std::string> contents;
std::optional<nix::Hash> sha256;

View file

@ -1,6 +1,7 @@
#include "state.hh"
#include "hydra-build-result.hh"
#include "globals.hh"
#include "thread-pool.hh"
#include <cstring>
@ -37,15 +38,13 @@ void State::queueMonitorLoop(Connection & conn)
auto destStore = getDestStore();
unsigned int lastBuildId = 0;
bool quit = false;
while (!quit) {
auto t_before_work = std::chrono::steady_clock::now();
localStore->clearPathInfoCache();
bool done = getQueuedBuilds(conn, destStore, lastBuildId);
bool done = getQueuedBuilds(conn, destStore);
if (buildOne && buildOneDone) quit = true;
@ -63,12 +62,10 @@ void State::queueMonitorLoop(Connection & conn)
conn.get_notifs();
if (auto lowestId = buildsAdded.get()) {
lastBuildId = std::min(lastBuildId, static_cast<unsigned>(std::stoul(*lowestId) - 1));
printMsg(lvlTalkative, "got notification: new builds added to the queue");
}
if (buildsRestarted.get()) {
printMsg(lvlTalkative, "got notification: builds restarted");
lastBuildId = 0; // check all builds
}
if (buildsCancelled.get() || buildsDeleted.get() || buildsBumped.get()) {
printMsg(lvlTalkative, "got notification: builds cancelled or bumped");
@ -95,20 +92,18 @@ struct PreviousFailure : public std::exception {
bool State::getQueuedBuilds(Connection & conn,
ref<Store> destStore, unsigned int & lastBuildId)
ref<Store> destStore)
{
prom.queue_checks_started.Increment();
printInfo("checking the queue for builds > %d...", lastBuildId);
printInfo("checking the queue for builds...");
/* Grab the queued builds from the database, but don't process
them yet (since we don't want a long-running transaction). */
std::vector<BuildID> newIDs;
std::map<BuildID, Build::ptr> newBuildsByID;
std::unordered_map<BuildID, Build::ptr> newBuildsByID;
std::multimap<StorePath, BuildID> newBuildsByPath;
unsigned int newLastBuildId = lastBuildId;
{
pqxx::work txn(conn);
@ -117,17 +112,12 @@ bool State::getQueuedBuilds(Connection & conn,
"jobsets.name as jobset, job, drvPath, maxsilent, timeout, timestamp, "
"globalPriority, priority from Builds "
"inner join jobsets on builds.jobset_id = jobsets.id "
"where builds.id > $1 and finished = 0 order by globalPriority desc, builds.id",
lastBuildId);
"where finished = 0 order by globalPriority desc, random()");
for (auto const & row : res) {
auto builds_(builds.lock());
BuildID id = row["id"].as<BuildID>();
if (buildOne && id != buildOne) continue;
if (id > newLastBuildId) {
newLastBuildId = id;
prom.queue_max_id.Set(id);
}
if (builds_->count(id)) continue;
auto build = std::make_shared<Build>(
@ -309,7 +299,7 @@ bool State::getQueuedBuilds(Connection & conn,
try {
createBuild(build);
} catch (Error & e) {
e.addTrace({}, hintfmt("while loading build %d: ", build->id));
e.addTrace({}, HintFmt("while loading build %d: ", build->id));
throw;
}
@ -329,15 +319,13 @@ bool State::getQueuedBuilds(Connection & conn,
/* Stop after a certain time to allow priority bumps to be
processed. */
if (std::chrono::system_clock::now() > start + std::chrono::seconds(600)) {
if (std::chrono::system_clock::now() > start + std::chrono::seconds(60)) {
prom.queue_checks_early_exits.Increment();
break;
}
}
prom.queue_checks_finished.Increment();
lastBuildId = newBuildsByID.empty() ? newLastBuildId : newBuildsByID.begin()->first - 1;
return newBuildsByID.empty();
}
@ -416,6 +404,34 @@ void State::processQueueChange(Connection & conn)
}
std::map<DrvOutput, std::optional<StorePath>> State::getMissingRemotePaths(
ref<Store> destStore,
const std::map<DrvOutput, std::optional<StorePath>> & paths)
{
Sync<std::map<DrvOutput, std::optional<StorePath>>> missing_;
ThreadPool tp;
for (auto & [output, maybeOutputPath] : paths) {
if (!maybeOutputPath) {
auto missing(missing_.lock());
missing->insert({output, maybeOutputPath});
} else {
tp.enqueue([&] {
if (!destStore->isValidPath(*maybeOutputPath)) {
auto missing(missing_.lock());
missing->insert({output, maybeOutputPath});
}
});
}
}
tp.process();
auto missing(missing_.lock());
return *missing;
}
Step::ptr State::createStep(ref<Store> destStore,
Connection & conn, Build::ptr build, const StorePath & drvPath,
Build::ptr referringBuild, Step::ptr referringStep, std::set<StorePath> & finishedDrvs,
@ -496,16 +512,15 @@ Step::ptr State::createStep(ref<Store> destStore,
/* Are all outputs valid? */
auto outputHashes = staticOutputHashes(*localStore, *(step->drv));
bool valid = true;
std::map<DrvOutput, std::optional<StorePath>> missing;
std::map<DrvOutput, std::optional<StorePath>> paths;
for (auto & [outputName, maybeOutputPath] : destStore->queryPartialDerivationOutputMap(drvPath, &*localStore)) {
auto outputHash = outputHashes.at(outputName);
if (maybeOutputPath && destStore->isValidPath(*maybeOutputPath))
continue;
valid = false;
missing.insert({{outputHash, outputName}, maybeOutputPath});
paths.insert({{outputHash, outputName}, maybeOutputPath});
}
auto missing = getMissingRemotePaths(destStore, paths);
bool valid = missing.empty();
/* Try to copy the missing paths from the local store or from
substitutes. */
if (!missing.empty()) {

View file

@ -490,7 +490,6 @@ private:
prometheus::Counter& queue_steps_created;
prometheus::Counter& queue_checks_early_exits;
prometheus::Counter& queue_checks_finished;
prometheus::Gauge& queue_max_id;
prometheus::Counter& dispatcher_time_spent_running;
prometheus::Counter& dispatcher_time_spent_waiting;
@ -546,8 +545,7 @@ private:
void queueMonitorLoop(Connection & conn);
/* Check the queue for new builds. */
bool getQueuedBuilds(Connection & conn,
nix::ref<nix::Store> destStore, unsigned int & lastBuildId);
bool getQueuedBuilds(Connection & conn, nix::ref<nix::Store> destStore);
/* Handle cancellation, deletion and priority bumps. */
void processQueueChange(Connection & conn);
@ -555,6 +553,12 @@ private:
BuildOutput getBuildOutputCached(Connection & conn, nix::ref<nix::Store> destStore,
const nix::StorePath & drvPath);
/* Returns paths missing from the remote store. Paths are processed in
* parallel to work around the possible latency of remote stores. */
std::map<nix::DrvOutput, std::optional<nix::StorePath>> getMissingRemotePaths(
nix::ref<nix::Store> destStore,
const std::map<nix::DrvOutput, std::optional<nix::StorePath>> & paths);
Step::ptr createStep(nix::ref<nix::Store> store,
Connection & conn, Build::ptr build, const nix::StorePath & drvPath,
Build::ptr referringBuild, Step::ptr referringStep, std::set<nix::StorePath> & finishedDrvs,
@ -590,10 +594,11 @@ private:
retried. */
enum StepResult { sDone, sRetry, sMaybeCancelled };
StepResult doBuildStep(nix::ref<nix::Store> destStore,
MachineReservation::ptr reservation,
MachineReservation::ptr & reservation,
std::shared_ptr<ActiveStep> activeStep);
void buildRemote(nix::ref<nix::Store> destStore,
MachineReservation::ptr & reservation,
Machine::ptr machine, Step::ptr step,
const BuildOptions & buildOptions,
RemoteResult & result, std::shared_ptr<ActiveStep> activeStep,

View file

@ -236,6 +236,9 @@ sub serveFile {
}
elsif ($ls->{type} eq "regular") {
# Have the hosted data considered its own origin to avoid being a giant
# XSS hole.
$c->response->header('Content-Security-Policy' => 'sandbox allow-scripts');
$c->stash->{'plain'} = { data => grab(cmd => ["nix", "--experimental-features", "nix-command",
"store", "cat", "--store", getStoreUri(), "$path"]) };

View file

@ -2,7 +2,6 @@
#include <pqxx/pqxx>
#include "environment-variables.hh"
#include "util.hh"
@ -18,7 +17,7 @@ struct Connection : pqxx::connection
std::string lower_prefix = "dbi:Pg:";
std::string upper_prefix = "DBI:Pg:";
if (hasPrefix(s, lower_prefix) || hasPrefix(s, upper_prefix)) {
if (s.starts_with(lower_prefix) || s.starts_with(upper_prefix)) {
return concatStringsSep(" ", tokenizeString<Strings>(std::string(s, lower_prefix.size()), ";"));
}

View file

@ -2,7 +2,6 @@
#include <map>
#include "file-system.hh"
#include "util.hh"
struct HydraConfig