forked from lix-project/hydra
Update to reflect BinaryCacheStore changes
BinaryCacheStore no longer implements buildPaths() and ensurePath(), so we need to use copyPath() / copyClosure().
This commit is contained in:
parent
fb0d2d2dda
commit
ee2e9f5335
|
@ -230,6 +230,7 @@ in
|
||||||
hydra_logo ${cfg.logo}
|
hydra_logo ${cfg.logo}
|
||||||
''}
|
''}
|
||||||
gc_roots_dir ${cfg.gcRootsDir}
|
gc_roots_dir ${cfg.gcRootsDir}
|
||||||
|
use-substitutes ${if cfg.useSubstitutes then "1" else "0"}
|
||||||
'';
|
'';
|
||||||
|
|
||||||
environment.systemPackages = [ cfg.package ];
|
environment.systemPackages = [ cfg.package ];
|
||||||
|
@ -325,7 +326,7 @@ in
|
||||||
IN_SYSTEMD = "1"; # to get log severity levels
|
IN_SYSTEMD = "1"; # to get log severity levels
|
||||||
};
|
};
|
||||||
serviceConfig =
|
serviceConfig =
|
||||||
{ ExecStart = "@${cfg.package}/bin/hydra-queue-runner hydra-queue-runner -v --option build-use-substitutes ${if cfg.useSubstitutes then "true" else "false"}";
|
{ ExecStart = "@${cfg.package}/bin/hydra-queue-runner hydra-queue-runner -v";
|
||||||
ExecStopPost = "${cfg.package}/bin/hydra-queue-runner --unlock";
|
ExecStopPost = "${cfg.package}/bin/hydra-queue-runner --unlock";
|
||||||
User = "hydra-queue-runner";
|
User = "hydra-queue-runner";
|
||||||
Restart = "always";
|
Restart = "always";
|
||||||
|
|
|
@ -202,7 +202,7 @@ void State::buildRemote(ref<Store> destStore,
|
||||||
a no-op for regular stores, but for the binary cache store,
|
a no-op for regular stores, but for the binary cache store,
|
||||||
this will copy the inputs to the binary cache from the local
|
this will copy the inputs to the binary cache from the local
|
||||||
store. */
|
store. */
|
||||||
destStore->buildPaths(basicDrv.inputSrcs);
|
copyClosure(ref<Store>(localStore), destStore, basicDrv.inputSrcs);
|
||||||
|
|
||||||
/* Copy the input closure. */
|
/* Copy the input closure. */
|
||||||
if (/* machine->sshName != "localhost" */ true) {
|
if (/* machine->sshName != "localhost" */ true) {
|
||||||
|
|
|
@ -71,12 +71,6 @@ MaintainCount State::startDbUpdate()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
ref<Store> State::getLocalStore()
|
|
||||||
{
|
|
||||||
return ref<Store>(_localStore);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
ref<Store> State::getDestStore()
|
ref<Store> State::getDestStore()
|
||||||
{
|
{
|
||||||
return ref<Store>(_destStore);
|
return ref<Store>(_destStore);
|
||||||
|
@ -797,14 +791,20 @@ void State::run(BuildID buildOne)
|
||||||
if (!lock)
|
if (!lock)
|
||||||
throw Error("hydra-queue-runner is already running");
|
throw Error("hydra-queue-runner is already running");
|
||||||
|
|
||||||
_localStore = openStore();
|
localStore = openStore();
|
||||||
|
|
||||||
if (hydraConfig["store_uri"] == "") {
|
if (hydraConfig["store_uri"] == "") {
|
||||||
_destStore = _localStore;
|
_destStore = localStore;
|
||||||
} else {
|
} else {
|
||||||
_destStore = openStoreAt(hydraConfig["store_uri"]);
|
_destStore = openStoreAt(hydraConfig["store_uri"]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto isTrue = [](const std::string & s) {
|
||||||
|
return s == "1" || s == "true";
|
||||||
|
};
|
||||||
|
|
||||||
|
useSubstitutes = isTrue(hydraConfig["use-substitutes"]);
|
||||||
|
|
||||||
{
|
{
|
||||||
auto conn(dbPool.get());
|
auto conn(dbPool.get());
|
||||||
clearBusy(*conn, 0);
|
clearBusy(*conn, 0);
|
||||||
|
|
|
@ -29,13 +29,12 @@ void State::queueMonitorLoop()
|
||||||
receiver buildsBumped(*conn, "builds_bumped");
|
receiver buildsBumped(*conn, "builds_bumped");
|
||||||
receiver jobsetSharesChanged(*conn, "jobset_shares_changed");
|
receiver jobsetSharesChanged(*conn, "jobset_shares_changed");
|
||||||
|
|
||||||
auto localStore = getLocalStore();
|
|
||||||
auto destStore = getDestStore();
|
auto destStore = getDestStore();
|
||||||
|
|
||||||
unsigned int lastBuildId = 0;
|
unsigned int lastBuildId = 0;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
bool done = getQueuedBuilds(*conn, localStore, destStore, lastBuildId);
|
bool done = getQueuedBuilds(*conn, destStore, lastBuildId);
|
||||||
|
|
||||||
/* Sleep until we get notification from the database about an
|
/* Sleep until we get notification from the database about an
|
||||||
event. */
|
event. */
|
||||||
|
@ -69,7 +68,7 @@ struct PreviousFailure : public std::exception {
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
bool State::getQueuedBuilds(Connection & conn, ref<Store> localStore,
|
bool State::getQueuedBuilds(Connection & conn,
|
||||||
ref<Store> destStore, unsigned int & lastBuildId)
|
ref<Store> destStore, unsigned int & lastBuildId)
|
||||||
{
|
{
|
||||||
printMsg(lvlInfo, format("checking the queue for builds > %1%...") % lastBuildId);
|
printMsg(lvlInfo, format("checking the queue for builds > %1%...") % lastBuildId);
|
||||||
|
@ -415,29 +414,44 @@ Step::ptr State::createStep(ref<Store> destStore,
|
||||||
bool valid = true;
|
bool valid = true;
|
||||||
PathSet outputs = step->drv.outputPaths();
|
PathSet outputs = step->drv.outputPaths();
|
||||||
DerivationOutputs missing;
|
DerivationOutputs missing;
|
||||||
PathSet missingPaths;
|
|
||||||
for (auto & i : step->drv.outputs)
|
for (auto & i : step->drv.outputs)
|
||||||
if (!destStore->isValidPath(i.second.path)) {
|
if (!destStore->isValidPath(i.second.path)) {
|
||||||
valid = false;
|
valid = false;
|
||||||
missing[i.first] = i.second;
|
missing[i.first] = i.second;
|
||||||
missingPaths.insert(i.second.path);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Try to substitute the missing paths. Note: can't use the more
|
/* Try to copy the missing paths from the local store or from
|
||||||
efficient querySubstitutablePaths() here because upstream Hydra
|
substitutes. */
|
||||||
servers don't allow it (they have "WantMassQuery: 0"). */
|
if (!missing.empty()) {
|
||||||
assert(missing.size() == missingPaths.size());
|
|
||||||
if (!missing.empty() && settings.useSubstitutes) {
|
size_t avail = 0;
|
||||||
|
for (auto & i : missing) {
|
||||||
|
if (/* localStore != destStore && */ localStore->isValidPath(i.second.path))
|
||||||
|
avail++;
|
||||||
|
else if (useSubstitutes) {
|
||||||
SubstitutablePathInfos infos;
|
SubstitutablePathInfos infos;
|
||||||
destStore->querySubstitutablePathInfos(missingPaths, infos); // FIXME
|
localStore->querySubstitutablePathInfos({i.second.path}, infos);
|
||||||
if (infos.size() == missingPaths.size()) {
|
if (infos.size() == 1)
|
||||||
|
avail++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (missing.size() == avail) {
|
||||||
valid = true;
|
valid = true;
|
||||||
for (auto & i : missing) {
|
for (auto & i : missing) {
|
||||||
try {
|
try {
|
||||||
printMsg(lvlInfo, format("substituting output ‘%1%’ of ‘%2%’") % i.second.path % drvPath);
|
|
||||||
|
|
||||||
time_t startTime = time(0);
|
time_t startTime = time(0);
|
||||||
destStore->ensurePath(i.second.path);
|
|
||||||
|
if (localStore->isValidPath(i.second.path))
|
||||||
|
printInfo("copying output ‘%1%’ of ‘%2%’ from local store", i.second.path, drvPath);
|
||||||
|
else {
|
||||||
|
printInfo("substituting output ‘%1%’ of ‘%2%’", i.second.path, drvPath);
|
||||||
|
localStore->ensurePath(i.second.path);
|
||||||
|
// FIXME: should copy directly from substituter to destStore.
|
||||||
|
}
|
||||||
|
|
||||||
|
copyClosure(ref<Store>(localStore), destStore, {i.second.path});
|
||||||
|
|
||||||
time_t stopTime = time(0);
|
time_t stopTime = time(0);
|
||||||
|
|
||||||
{
|
{
|
||||||
|
@ -448,6 +462,7 @@ Step::ptr State::createStep(ref<Store> destStore,
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch (Error & e) {
|
} catch (Error & e) {
|
||||||
|
printError("while copying/substituting output ‘%s’ of ‘%s’: %s", i.second.path, drvPath, e.what());
|
||||||
valid = false;
|
valid = false;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -257,6 +257,8 @@ private:
|
||||||
|
|
||||||
std::map<std::string, std::string> hydraConfig;
|
std::map<std::string, std::string> hydraConfig;
|
||||||
|
|
||||||
|
bool useSubstitutes = false;
|
||||||
|
|
||||||
/* The queued builds. */
|
/* The queued builds. */
|
||||||
typedef std::map<BuildID, Build::ptr> Builds;
|
typedef std::map<BuildID, Build::ptr> Builds;
|
||||||
nix::Sync<Builds> builds;
|
nix::Sync<Builds> builds;
|
||||||
|
@ -374,7 +376,7 @@ private:
|
||||||
|
|
||||||
std::atomic<time_t> lastDispatcherCheck{0};
|
std::atomic<time_t> lastDispatcherCheck{0};
|
||||||
|
|
||||||
std::shared_ptr<nix::Store> _localStore;
|
std::shared_ptr<nix::Store> localStore;
|
||||||
std::shared_ptr<nix::Store> _destStore;
|
std::shared_ptr<nix::Store> _destStore;
|
||||||
|
|
||||||
/* Token server to prevent threads from allocating too many big
|
/* Token server to prevent threads from allocating too many big
|
||||||
|
@ -401,10 +403,6 @@ private:
|
||||||
|
|
||||||
MaintainCount startDbUpdate();
|
MaintainCount startDbUpdate();
|
||||||
|
|
||||||
/* Return a store object that can access derivations produced by
|
|
||||||
hydra-evaluator. */
|
|
||||||
nix::ref<nix::Store> getLocalStore();
|
|
||||||
|
|
||||||
/* Return a store object to store build results. */
|
/* Return a store object to store build results. */
|
||||||
nix::ref<nix::Store> getDestStore();
|
nix::ref<nix::Store> getDestStore();
|
||||||
|
|
||||||
|
@ -436,7 +434,7 @@ private:
|
||||||
void queueMonitorLoop();
|
void queueMonitorLoop();
|
||||||
|
|
||||||
/* Check the queue for new builds. */
|
/* Check the queue for new builds. */
|
||||||
bool getQueuedBuilds(Connection & conn, nix::ref<nix::Store> localStore,
|
bool getQueuedBuilds(Connection & conn,
|
||||||
nix::ref<nix::Store> destStore, unsigned int & lastBuildId);
|
nix::ref<nix::Store> destStore, unsigned int & lastBuildId);
|
||||||
|
|
||||||
/* Handle cancellation, deletion and priority bumps. */
|
/* Handle cancellation, deletion and priority bumps. */
|
||||||
|
|
Loading…
Reference in a new issue