Update to reflect BinaryCacheStore changes

BinaryCacheStore no longer implements buildPaths() and ensurePath(),
so we need to use copyPath() / copyClosure().
This commit is contained in:
Eelco Dolstra 2016-10-07 20:23:05 +02:00
parent fb0d2d2dda
commit ee2e9f5335
5 changed files with 46 additions and 32 deletions

View file

@ -230,6 +230,7 @@ in
hydra_logo ${cfg.logo}
''}
gc_roots_dir ${cfg.gcRootsDir}
use-substitutes ${if cfg.useSubstitutes then "1" else "0"}
'';
environment.systemPackages = [ cfg.package ];
@ -325,7 +326,7 @@ in
IN_SYSTEMD = "1"; # to get log severity levels
};
serviceConfig =
{ ExecStart = "@${cfg.package}/bin/hydra-queue-runner hydra-queue-runner -v --option build-use-substitutes ${if cfg.useSubstitutes then "true" else "false"}";
{ ExecStart = "@${cfg.package}/bin/hydra-queue-runner hydra-queue-runner -v";
ExecStopPost = "${cfg.package}/bin/hydra-queue-runner --unlock";
User = "hydra-queue-runner";
Restart = "always";

View file

@ -202,7 +202,7 @@ void State::buildRemote(ref<Store> destStore,
a no-op for regular stores, but for the binary cache store,
this will copy the inputs to the binary cache from the local
store. */
destStore->buildPaths(basicDrv.inputSrcs);
copyClosure(ref<Store>(localStore), destStore, basicDrv.inputSrcs);
/* Copy the input closure. */
if (/* machine->sshName != "localhost" */ true) {

View file

@ -71,12 +71,6 @@ MaintainCount State::startDbUpdate()
}
ref<Store> State::getLocalStore()
{
return ref<Store>(_localStore);
}
ref<Store> State::getDestStore()
{
return ref<Store>(_destStore);
@ -797,14 +791,20 @@ void State::run(BuildID buildOne)
if (!lock)
throw Error("hydra-queue-runner is already running");
_localStore = openStore();
localStore = openStore();
if (hydraConfig["store_uri"] == "") {
_destStore = _localStore;
_destStore = localStore;
} else {
_destStore = openStoreAt(hydraConfig["store_uri"]);
}
auto isTrue = [](const std::string & s) {
return s == "1" || s == "true";
};
useSubstitutes = isTrue(hydraConfig["use-substitutes"]);
{
auto conn(dbPool.get());
clearBusy(*conn, 0);

View file

@ -29,13 +29,12 @@ void State::queueMonitorLoop()
receiver buildsBumped(*conn, "builds_bumped");
receiver jobsetSharesChanged(*conn, "jobset_shares_changed");
auto localStore = getLocalStore();
auto destStore = getDestStore();
unsigned int lastBuildId = 0;
while (true) {
bool done = getQueuedBuilds(*conn, localStore, destStore, lastBuildId);
bool done = getQueuedBuilds(*conn, destStore, lastBuildId);
/* Sleep until we get notification from the database about an
event. */
@ -69,7 +68,7 @@ struct PreviousFailure : public std::exception {
};
bool State::getQueuedBuilds(Connection & conn, ref<Store> localStore,
bool State::getQueuedBuilds(Connection & conn,
ref<Store> destStore, unsigned int & lastBuildId)
{
printMsg(lvlInfo, format("checking the queue for builds > %1%...") % lastBuildId);
@ -415,29 +414,44 @@ Step::ptr State::createStep(ref<Store> destStore,
bool valid = true;
PathSet outputs = step->drv.outputPaths();
DerivationOutputs missing;
PathSet missingPaths;
for (auto & i : step->drv.outputs)
if (!destStore->isValidPath(i.second.path)) {
valid = false;
missing[i.first] = i.second;
missingPaths.insert(i.second.path);
}
/* Try to substitute the missing paths. Note: can't use the more
efficient querySubstitutablePaths() here because upstream Hydra
servers don't allow it (they have "WantMassQuery: 0"). */
assert(missing.size() == missingPaths.size());
if (!missing.empty() && settings.useSubstitutes) {
SubstitutablePathInfos infos;
destStore->querySubstitutablePathInfos(missingPaths, infos); // FIXME
if (infos.size() == missingPaths.size()) {
/* Try to copy the missing paths from the local store or from
substitutes. */
if (!missing.empty()) {
size_t avail = 0;
for (auto & i : missing) {
if (/* localStore != destStore && */ localStore->isValidPath(i.second.path))
avail++;
else if (useSubstitutes) {
SubstitutablePathInfos infos;
localStore->querySubstitutablePathInfos({i.second.path}, infos);
if (infos.size() == 1)
avail++;
}
}
if (missing.size() == avail) {
valid = true;
for (auto & i : missing) {
try {
printMsg(lvlInfo, format("substituting output %1% of %2%") % i.second.path % drvPath);
time_t startTime = time(0);
destStore->ensurePath(i.second.path);
if (localStore->isValidPath(i.second.path))
printInfo("copying output %1% of %2% from local store", i.second.path, drvPath);
else {
printInfo("substituting output %1% of %2%", i.second.path, drvPath);
localStore->ensurePath(i.second.path);
// FIXME: should copy directly from substituter to destStore.
}
copyClosure(ref<Store>(localStore), destStore, {i.second.path});
time_t stopTime = time(0);
{
@ -448,6 +462,7 @@ Step::ptr State::createStep(ref<Store> destStore,
}
} catch (Error & e) {
printError("while copying/substituting output %s of %s: %s", i.second.path, drvPath, e.what());
valid = false;
break;
}

View file

@ -257,6 +257,8 @@ private:
std::map<std::string, std::string> hydraConfig;
bool useSubstitutes = false;
/* The queued builds. */
typedef std::map<BuildID, Build::ptr> Builds;
nix::Sync<Builds> builds;
@ -374,7 +376,7 @@ private:
std::atomic<time_t> lastDispatcherCheck{0};
std::shared_ptr<nix::Store> _localStore;
std::shared_ptr<nix::Store> localStore;
std::shared_ptr<nix::Store> _destStore;
/* Token server to prevent threads from allocating too many big
@ -401,10 +403,6 @@ private:
MaintainCount startDbUpdate();
/* Return a store object that can access derivations produced by
hydra-evaluator. */
nix::ref<nix::Store> getLocalStore();
/* Return a store object to store build results. */
nix::ref<nix::Store> getDestStore();
@ -436,7 +434,7 @@ private:
void queueMonitorLoop();
/* Check the queue for new builds. */
bool getQueuedBuilds(Connection & conn, nix::ref<nix::Store> localStore,
bool getQueuedBuilds(Connection & conn,
nix::ref<nix::Store> destStore, unsigned int & lastBuildId);
/* Handle cancellation, deletion and priority bumps. */