From 2d0dd7fb4963119b4ee26ec093d6910fd0d08d23 Mon Sep 17 00:00:00 2001 From: Eelco Dolstra Date: Mon, 15 Feb 2016 21:10:29 +0100 Subject: [PATCH] hydra-queue-runner: Write directly to a binary cache --- src/hydra-queue-runner/Makefile.am | 3 +- src/hydra-queue-runner/build-remote.cc | 32 ++- src/hydra-queue-runner/build-result.cc | 6 + src/hydra-queue-runner/builder.cc | 10 +- src/hydra-queue-runner/hydra-queue-runner.cc | 16 +- src/hydra-queue-runner/local-binary-cache.cc | 267 +++++++++++++++++++ src/hydra-queue-runner/local-binary-cache.hh | 124 +++++++++ src/hydra-queue-runner/queue-monitor.cc | 25 +- src/hydra-queue-runner/state.hh | 14 +- 9 files changed, 463 insertions(+), 34 deletions(-) create mode 100644 src/hydra-queue-runner/local-binary-cache.cc create mode 100644 src/hydra-queue-runner/local-binary-cache.hh diff --git a/src/hydra-queue-runner/Makefile.am b/src/hydra-queue-runner/Makefile.am index 089187d1..21375f1c 100644 --- a/src/hydra-queue-runner/Makefile.am +++ b/src/hydra-queue-runner/Makefile.am @@ -2,7 +2,8 @@ bin_PROGRAMS = hydra-queue-runner hydra_queue_runner_SOURCES = hydra-queue-runner.cc queue-monitor.cc dispatcher.cc \ builder.cc build-result.cc build-remote.cc \ - build-result.hh counter.hh pool.hh sync.hh token-server.hh state.hh db.hh + build-result.hh counter.hh pool.hh sync.hh token-server.hh state.hh db.hh \ + local-binary-cache.hh local-binary-cache.cc hydra_queue_runner_LDADD = $(NIX_LIBS) -lpqxx AM_CXXFLAGS = $(NIX_CFLAGS) -Wall diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc index 39d041d9..11490432 100644 --- a/src/hydra-queue-runner/build-remote.cc +++ b/src/hydra-queue-runner/build-remote.cc @@ -73,14 +73,14 @@ static void openConnection(Machine::ptr machine, Path tmpDir, int stderrFD, Chil } -static void copyClosureTo(ref store, +static void copyClosureTo(ref destStore, FdSource & from, FdSink & to, const PathSet & paths, counter & bytesSent, bool useSubstitutes = false) { PathSet closure; for (auto & path : paths) - store->computeFSClosure(path, closure); + destStore->computeFSClosure(path, closure); /* Send the "query valid paths" command with the "lock" option enabled. This prevents a race where the remote host @@ -95,7 +95,7 @@ static void copyClosureTo(ref store, if (present.size() == closure.size()) return; - Paths sorted = store->topoSortPaths(closure); + Paths sorted = destStore->topoSortPaths(closure); Paths missing; for (auto i = sorted.rbegin(); i != sorted.rend(); ++i) @@ -104,10 +104,10 @@ static void copyClosureTo(ref store, printMsg(lvlDebug, format("sending %1% missing paths") % missing.size()); for (auto & p : missing) - bytesSent += store->queryPathInfo(p).narSize; + bytesSent += destStore->queryPathInfo(p).narSize; to << cmdImportPaths; - store->exportPaths(missing, false, to); + destStore->exportPaths(missing, false, to); to.flush(); if (readInt(from) != 1) @@ -115,19 +115,19 @@ static void copyClosureTo(ref store, } -static void copyClosureFrom(ref store, +static void copyClosureFrom(ref destStore, FdSource & from, FdSink & to, const PathSet & paths, counter & bytesReceived) { to << cmdExportPaths << 0 << paths; to.flush(); - store->importPaths(false, from); + destStore->importPaths(false, from); for (auto & p : paths) - bytesReceived += store->queryPathInfo(p).narSize; + bytesReceived += destStore->queryPathInfo(p).narSize; } -void State::buildRemote(ref store, +void State::buildRemote(ref destStore, Machine::ptr machine, Step::ptr step, unsigned int maxSilentTime, unsigned int buildTimeout, RemoteResult & result) @@ -222,14 +222,20 @@ void State::buildRemote(ref store, } } + /* Ensure that the inputs exist in the destination store. This is + a no-op for regular stores, but for the binary cache store, + this will copy the inputs to the binary cache from the local + store. */ + destStore->buildPaths(basicDrv.inputSrcs); + /* Copy the input closure. */ - if (machine->sshName != "localhost") { + if (/* machine->sshName != "localhost" */ true) { auto mc1 = std::make_shared(nrStepsWaiting); std::lock_guard sendLock(machine->state->sendLock); mc1.reset(); MaintainCount mc2(nrStepsCopyingTo); printMsg(lvlDebug, format("sending closure of ‘%1%’ to ‘%2%’") % step->drvPath % machine->sshName); - copyClosureTo(store, from, to, inputs, bytesSent); + copyClosureTo(destStore, from, to, inputs, bytesSent); } autoDelete.cancel(); @@ -277,13 +283,13 @@ void State::buildRemote(ref store, } /* Copy the output paths. */ - if (machine->sshName != "localhost") { + if (/* machine->sshName != "localhost" */ true) { printMsg(lvlDebug, format("copying outputs of ‘%1%’ from ‘%2%’") % step->drvPath % machine->sshName); PathSet outputs; for (auto & output : step->drv.outputs) outputs.insert(output.second.path); MaintainCount mc(nrStepsCopyingFrom); - copyClosureFrom(store, from, to, outputs, bytesReceived); + copyClosureFrom(destStore, from, to, outputs, bytesReceived); } /* Shut down the connection. */ diff --git a/src/hydra-queue-runner/build-result.cc b/src/hydra-queue-runner/build-result.cc index ebf07035..f09d13a1 100644 --- a/src/hydra-queue-runner/build-result.cc +++ b/src/hydra-queue-runner/build-result.cc @@ -41,6 +41,7 @@ BuildOutput getBuildOutput(nix::ref store, const Derivation & drv) /* Get build products. */ bool explicitProducts = false; +#if 0 Regex regex( "(([a-zA-Z0-9_-]+)" // type (e.g. "doc") "[[:space:]]+" @@ -97,6 +98,7 @@ BuildOutput getBuildOutput(nix::ref store, const Derivation & drv) res.products.push_back(product); } } +#endif /* If no build products were explicitly declared, then add all outputs as a product of type "nix-build". */ @@ -108,14 +110,17 @@ BuildOutput getBuildOutput(nix::ref store, const Derivation & drv) product.subtype = output.first == "out" ? "" : output.first; product.name = storePathToName(product.path); +#if 0 struct stat st; if (stat(product.path.c_str(), &st)) throw SysError(format("getting status of ‘%1%’") % product.path); if (S_ISDIR(st.st_mode)) +#endif res.products.push_back(product); } } +#if 0 /* Get the release name from $output/nix-support/hydra-release-name. */ for (auto & output : outputs) { Path p = output + "/nix-support/hydra-release-name"; @@ -139,6 +144,7 @@ BuildOutput getBuildOutput(nix::ref store, const Derivation & drv) res.metrics[metric.name] = metric; } } +#endif return res; } diff --git a/src/hydra-queue-runner/builder.cc b/src/hydra-queue-runner/builder.cc index 9d1fd0dc..0c564fa2 100644 --- a/src/hydra-queue-runner/builder.cc +++ b/src/hydra-queue-runner/builder.cc @@ -15,8 +15,8 @@ void State::builder(MachineReservation::ptr reservation) auto step = reservation->step; try { - auto store = openStore(); // FIXME: pool - retry = doBuildStep(store, step, reservation->machine); + auto destStore = getDestStore(); + retry = doBuildStep(destStore, step, reservation->machine); } catch (std::exception & e) { printMsg(lvlError, format("uncaught exception building ‘%1%’ on ‘%2%’: %3%") % step->drvPath % reservation->machine->sshName % e.what()); @@ -45,7 +45,7 @@ void State::builder(MachineReservation::ptr reservation) } -bool State::doBuildStep(nix::ref store, Step::ptr step, +bool State::doBuildStep(nix::ref destStore, Step::ptr step, Machine::ptr machine) { { @@ -120,13 +120,13 @@ bool State::doBuildStep(nix::ref store, Step::ptr step, /* Do the build. */ try { /* FIXME: referring builds may have conflicting timeouts. */ - buildRemote(store, machine, step, build->maxSilentTime, build->buildTimeout, result); + buildRemote(destStore, machine, step, build->maxSilentTime, build->buildTimeout, result); } catch (Error & e) { result.status = BuildResult::MiscFailure; result.errorMsg = e.msg(); } - if (result.success()) res = getBuildOutput(store, step->drv); + if (result.success()) res = getBuildOutput(destStore, step->drv); } time_t stepStopTime = time(0); diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index 4d1146f5..1e896ce9 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -7,6 +7,7 @@ #include "state.hh" #include "build-result.hh" +#include "local-binary-cache.hh" #include "shared.hh" #include "globals.hh" @@ -24,6 +25,18 @@ State::State() } +ref State::getLocalStore() +{ + return openStore(); // FIXME: pool +} + + +ref State::getDestStore() +{ + return make_ref(getLocalStore(), "/tmp/binary-cache"); +} + + void State::parseMachines(const std::string & contents) { Machines newMachines, oldMachines; @@ -94,7 +107,8 @@ void State::monitorMachinesFile() getEnv("NIX_REMOTE_SYSTEMS", pathExists(defaultMachinesFile) ? defaultMachinesFile : ""), ":"); if (machinesFiles.empty()) { - parseMachines("localhost " + settings.thisSystem + parseMachines("localhost " + + (settings.thisSystem == "x86_64-linux" ? "x86_64-linux,i686-linux" : settings.thisSystem) + " - " + std::to_string(settings.maxBuildJobs) + " 1"); return; } diff --git a/src/hydra-queue-runner/local-binary-cache.cc b/src/hydra-queue-runner/local-binary-cache.cc new file mode 100644 index 00000000..997f5a51 --- /dev/null +++ b/src/hydra-queue-runner/local-binary-cache.cc @@ -0,0 +1,267 @@ +#include "local-binary-cache.hh" + +#include "archive.hh" +#include "derivations.hh" +#include "globals.hh" +#include "worker-protocol.hh" + +namespace nix { + +LocalBinaryCache::LocalBinaryCache(ref localStore, const Path & binaryCacheDir) + : localStore(localStore), binaryCacheDir(binaryCacheDir) +{ + createDirs(binaryCacheDir + "/nar"); +} + +Path LocalBinaryCache::narInfoFileFor(const Path & storePath) +{ + assertStorePath(storePath); + return binaryCacheDir + "/" + storePathToHash(storePath) + ".narinfo"; +} + +void atomicWrite(const Path & path, const std::string & s) +{ + Path tmp = path + ".tmp." + std::to_string(getpid()); + AutoDelete del(tmp, false); + writeFile(tmp, s); + if (rename(tmp.c_str(), path.c_str())) + throw SysError(format("renaming ‘%1%’ to ‘%2%’") % tmp % path); + del.cancel(); +} + +void LocalBinaryCache::addToCache(const ValidPathInfo & info, + const string & nar) +{ + size_t narSize = nar.size(); + Hash narHash = hashString(htSHA256, nar); + + if (info.hash.type != htUnknown && info.hash != narHash) + throw Error(format("refusing to copy corrupted path ‘%1%’ to binary cache") % info.path); + + printMsg(lvlTalkative, format("copying path ‘%1%’ (%2% bytes) to binary cache") + % info.path % narSize); + + /* Atomically write the NAR file. */ + string narFileRel = "nar/" + printHash(narHash) + ".nar"; + Path narFile = binaryCacheDir + "/" + narFileRel; + if (!pathExists(narFile)) atomicWrite(narFile, nar); + + /* Atomically write the NAR info file.*/ + Path narInfoFile = narInfoFileFor(info.path); + + if (!pathExists(narInfoFile)) { + + Strings refs; + for (auto & r : info.references) + refs.push_back(baseNameOf(r)); + + std::string narInfo; + narInfo += "StorePath: " + info.path + "\n"; + narInfo += "URL: " + narFileRel + "\n"; + narInfo += "Compression: none\n"; + narInfo += "FileHash: sha256:" + printHash(narHash) + "\n"; + narInfo += "FileSize: " + std::to_string(narSize) + "\n"; + narInfo += "NarHash: sha256:" + printHash(narHash) + "\n"; + narInfo += "NarSize: " + std::to_string(narSize) + "\n"; + narInfo += "References: " + concatStringsSep(" ", refs) + "\n"; + + // FIXME: add signature + + atomicWrite(narInfoFile, narInfo); + } +} + +LocalBinaryCache::NarInfo LocalBinaryCache::readNarInfo(const Path & storePath) +{ + NarInfo res; + + Path narInfoFile = narInfoFileFor(storePath); + if (!pathExists(narInfoFile)) + abort(); + std::string narInfo = readFile(narInfoFile); + + auto corrupt = [&]() { + throw Error(format("corrupt NAR info file ‘%1%’") % narInfoFile); + }; + + size_t pos = 0; + while (pos < narInfo.size()) { + + size_t colon = narInfo.find(':', pos); + if (colon == std::string::npos) corrupt(); + + std::string name(narInfo, pos, colon - pos); + + size_t eol = narInfo.find('\n', colon + 2); + if (eol == std::string::npos) corrupt(); + + std::string value(narInfo, colon + 2, eol - colon - 2); + + if (name == "StorePath") { + res.info.path = value; + if (value != storePath) corrupt(); + res.info.path = value; + } + else if (name == "References") { + auto refs = tokenizeString(value, " "); + if (!res.info.references.empty()) corrupt(); + for (auto & r : refs) + res.info.references.insert(settings.nixStore + "/" + r); + } + else if (name == "URL") { + res.narUrl = value; + } + + pos = eol + 1; + } + + if (res.info.path.empty() || res.narUrl.empty()) corrupt(); + + return res; +} + +bool LocalBinaryCache::isValidPath(const Path & storePath) +{ + Path narInfoFile = narInfoFileFor(storePath); + + printMsg(lvlDebug, format("checking %1% -> %2%") % storePath % narInfoFile); + + return pathExists(narInfoFile); +} + +void LocalBinaryCache::exportPath(const Path & storePath, bool sign, Sink & sink) +{ + assert(!sign); + + auto res = readNarInfo(storePath); + + auto nar = readFile(binaryCacheDir + "/" + res.narUrl); + + printMsg(lvlTalkative, format("exporting path ‘%1%’ (%2% bytes)") % storePath % nar.size()); + + assert(nar.size() % 8 == 0); + + sink((unsigned char *) nar.c_str(), nar.size()); + + // FIXME: check integrity of NAR. + + sink << exportMagic << storePath << res.info.references << res.info.deriver << 0; +} + +Paths LocalBinaryCache::importPaths(bool requireSignature, Source & source) +{ + assert(!requireSignature); + Paths res; + while (true) { + unsigned long long n = readLongLong(source); + if (n == 0) break; + if (n != 1) throw Error("input doesn't look like something created by ‘nix-store --export’"); + res.push_back(importPath(source)); + } + return res; +} + +struct TeeSource : Source +{ + Source & readSource; + std::string data; + TeeSource(Source & readSource) : readSource(readSource) + { + } + size_t read(unsigned char * data, size_t len) + { + size_t n = readSource.read(data, len); + this->data.append((char *) data, n); + return n; + } +}; + +struct NopSink : ParseSink +{ +}; + +Path LocalBinaryCache::importPath(Source & source) +{ + /* FIXME: some cut&paste of LocalStore::importPath(). */ + + /* Extract the NAR from the source. */ + TeeSource tee(source); + NopSink sink; + parseDump(sink, tee); + + uint32_t magic = readInt(source); + if (magic != exportMagic) + throw Error("Nix archive cannot be imported; wrong format"); + + ValidPathInfo info; + info.path = readStorePath(source); + + info.references = readStorePaths(source); + + readString(source); // deriver, don't care + + bool haveSignature = readInt(source) == 1; + assert(!haveSignature); + + addToCache(info, tee.data); + + return info.path; +} + +ValidPathInfo LocalBinaryCache::queryPathInfo(const Path & storePath) +{ + return readNarInfo(storePath).info; +} + +void LocalBinaryCache::querySubstitutablePathInfos(const PathSet & paths, + SubstitutablePathInfos & infos) +{ + PathSet left; + + for (auto & storePath : paths) { + if (!localStore->isValidPath(storePath)) { + left.insert(storePath); + continue; + } + ValidPathInfo info = localStore->queryPathInfo(storePath); + SubstitutablePathInfo sub; + sub.references = info.references; + sub.downloadSize = 0; + sub.narSize = info.narSize; + infos.emplace(storePath, sub); + } + + localStore->querySubstitutablePathInfos(left, infos); +} + +void LocalBinaryCache::buildPaths(const PathSet & paths, BuildMode buildMode) +{ + for (auto & storePath : paths) { + assert(!isDerivation(storePath)); + + if (isValidPath(storePath)) continue; + + localStore->addTempRoot(storePath); + + if (!localStore->isValidPath(storePath)) + localStore->ensurePath(storePath); + + ValidPathInfo info = localStore->queryPathInfo(storePath); + + for (auto & ref : info.references) + if (ref != storePath) + ensurePath(ref); + + StringSink sink; + dumpPath(storePath, sink); + + addToCache(info, sink.s); + } +} + +void LocalBinaryCache::ensurePath(const Path & path) +{ + buildPaths({path}); +} + +} diff --git a/src/hydra-queue-runner/local-binary-cache.hh b/src/hydra-queue-runner/local-binary-cache.hh new file mode 100644 index 00000000..4ee61f42 --- /dev/null +++ b/src/hydra-queue-runner/local-binary-cache.hh @@ -0,0 +1,124 @@ +#pragma once + +#include "store-api.hh" + +namespace nix { + +class LocalBinaryCache : public nix::Store +{ +private: + ref localStore; + Path binaryCacheDir; + +public: + + LocalBinaryCache(ref localStore, const Path & binaryCacheDir); + +private: + + Path narInfoFileFor(const Path & storePath); + + void addToCache(const ValidPathInfo & info, const string & nar); + + struct NarInfo + { + ValidPathInfo info; + std::string narUrl; + }; + + NarInfo readNarInfo(const Path & storePath); + +public: + + bool isValidPath(const Path & path) override; + + PathSet queryValidPaths(const PathSet & paths) override + { abort(); } + + PathSet queryAllValidPaths() override + { abort(); } + + ValidPathInfo queryPathInfo(const Path & path) override; + + Hash queryPathHash(const Path & path) override + { abort(); } + + void queryReferrers(const Path & path, + PathSet & referrers) override + { abort(); } + + Path queryDeriver(const Path & path) override + { abort(); } + + PathSet queryValidDerivers(const Path & path) override + { abort(); } + + PathSet queryDerivationOutputs(const Path & path) override + { abort(); } + + StringSet queryDerivationOutputNames(const Path & path) override + { abort(); } + + Path queryPathFromHashPart(const string & hashPart) override + { abort(); } + + PathSet querySubstitutablePaths(const PathSet & paths) override + { abort(); } + + void querySubstitutablePathInfos(const PathSet & paths, + SubstitutablePathInfos & infos) override; + + Path addToStore(const string & name, const Path & srcPath, + bool recursive = true, HashType hashAlgo = htSHA256, + PathFilter & filter = defaultPathFilter, bool repair = false) override + { abort(); } + + Path addTextToStore(const string & name, const string & s, + const PathSet & references, bool repair = false) override + { abort(); } + + void exportPath(const Path & path, bool sign, + Sink & sink) override; + + Paths importPaths(bool requireSignature, Source & source) override; + + Path importPath(Source & source); + + void buildPaths(const PathSet & paths, BuildMode buildMode = bmNormal) override; + + BuildResult buildDerivation(const Path & drvPath, const BasicDerivation & drv, + BuildMode buildMode = bmNormal) override + { abort(); } + + void ensurePath(const Path & path) override; + + void addTempRoot(const Path & path) override + { abort(); } + + void addIndirectRoot(const Path & path) override + { abort(); } + + void syncWithGC() override + { } + + Roots findRoots() override + { abort(); } + + void collectGarbage(const GCOptions & options, GCResults & results) override + { abort(); } + + PathSet queryFailedPaths() override + { return PathSet(); } + + void clearFailedPaths(const PathSet & paths) override + { } + + void optimiseStore() override + { } + + bool verifyStore(bool checkContents, bool repair) override + { return true; } + +}; + +} diff --git a/src/hydra-queue-runner/queue-monitor.cc b/src/hydra-queue-runner/queue-monitor.cc index bf96f489..f7e40827 100644 --- a/src/hydra-queue-runner/queue-monitor.cc +++ b/src/hydra-queue-runner/queue-monitor.cc @@ -30,12 +30,13 @@ void State::queueMonitorLoop() receiver buildsBumped(*conn, "builds_bumped"); receiver jobsetSharesChanged(*conn, "jobset_shares_changed"); - auto store = openStore(); // FIXME: pool + auto localStore = getLocalStore(); + auto destStore = getDestStore(); unsigned int lastBuildId = 0; while (true) { - bool done = getQueuedBuilds(*conn, store, lastBuildId); + bool done = getQueuedBuilds(*conn, localStore, destStore, lastBuildId); /* Sleep until we get notification from the database about an event. */ @@ -63,7 +64,8 @@ void State::queueMonitorLoop() } -bool State::getQueuedBuilds(Connection & conn, ref store, unsigned int & lastBuildId) +bool State::getQueuedBuilds(Connection & conn, ref localStore, + ref destStore, unsigned int & lastBuildId) { printMsg(lvlInfo, format("checking the queue for builds > %1%...") % lastBuildId); @@ -118,7 +120,7 @@ bool State::getQueuedBuilds(Connection & conn, ref store, unsigned int & nrAdded++; newBuildsByID.erase(build->id); - if (!store->isValidPath(build->drvPath)) { + if (!localStore->isValidPath(build->drvPath)) { /* Derivation has been GC'ed prematurely. */ printMsg(lvlError, format("aborting GC'ed build %1%") % build->id); if (!build->finishedInDB) { @@ -138,7 +140,8 @@ bool State::getQueuedBuilds(Connection & conn, ref store, unsigned int & std::set newSteps; std::set finishedDrvs; // FIXME: re-use? - Step::ptr step = createStep(store, conn, build, build->drvPath, build, 0, finishedDrvs, newSteps, newRunnable); + Step::ptr step = createStep(destStore, conn, build, build->drvPath, + build, 0, finishedDrvs, newSteps, newRunnable); /* Some of the new steps may be the top level of builds that we haven't processed yet. So do them now. This ensures that @@ -156,7 +159,7 @@ bool State::getQueuedBuilds(Connection & conn, ref store, unsigned int & all valid. So we mark this as a finished, cached build. */ if (!step) { Derivation drv = readDerivation(build->drvPath); - BuildOutput res = getBuildOutput(store, drv); + BuildOutput res = getBuildOutput(destStore, drv); pqxx::work txn(conn); time_t now = time(0); @@ -314,7 +317,7 @@ void State::processQueueChange(Connection & conn) } -Step::ptr State::createStep(ref store, +Step::ptr State::createStep(ref destStore, Connection & conn, Build::ptr build, const Path & drvPath, Build::ptr referringBuild, Step::ptr referringStep, std::set & finishedDrvs, std::set & newSteps, std::set & newRunnable) @@ -394,7 +397,7 @@ Step::ptr State::createStep(ref store, DerivationOutputs missing; PathSet missingPaths; for (auto & i : step->drv.outputs) - if (!store->isValidPath(i.second.path)) { + if (!destStore->isValidPath(i.second.path)) { valid = false; missing[i.first] = i.second; missingPaths.insert(i.second.path); @@ -406,7 +409,7 @@ Step::ptr State::createStep(ref store, assert(missing.size() == missingPaths.size()); if (!missing.empty() && settings.useSubstitutes) { SubstitutablePathInfos infos; - store->querySubstitutablePathInfos(missingPaths, infos); + destStore->querySubstitutablePathInfos(missingPaths, infos); // FIXME if (infos.size() == missingPaths.size()) { valid = true; for (auto & i : missing) { @@ -414,7 +417,7 @@ Step::ptr State::createStep(ref store, printMsg(lvlInfo, format("substituting output ‘%1%’ of ‘%2%’") % i.second.path % drvPath); time_t startTime = time(0); - store->ensurePath(i.second.path); + destStore->ensurePath(i.second.path); time_t stopTime = time(0); { @@ -443,7 +446,7 @@ Step::ptr State::createStep(ref store, /* Create steps for the dependencies. */ for (auto & i : step->drv.inputDrvs) { - auto dep = createStep(store, conn, build, i.first, 0, step, finishedDrvs, newSteps, newRunnable); + auto dep = createStep(destStore, conn, build, i.first, 0, step, finishedDrvs, newSteps, newRunnable); if (dep) { auto step_(step->state.lock()); step_->deps.insert(dep); diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index a296ddc7..1b33edaa 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -350,6 +350,13 @@ public: private: + /* Return a store object that can access derivations produced by + hydra-evaluator. */ + nix::ref getLocalStore(); + + /* Return a store object to store build results. */ + nix::ref getDestStore(); + void clearBusy(Connection & conn, time_t stopTime); void parseMachines(const std::string & contents); @@ -377,7 +384,8 @@ private: void queueMonitorLoop(); /* Check the queue for new builds. */ - bool getQueuedBuilds(Connection & conn, nix::ref store, unsigned int & lastBuildId); + bool getQueuedBuilds(Connection & conn, nix::ref localStore, + nix::ref destStore, unsigned int & lastBuildId); /* Handle cancellation, deletion and priority bumps. */ void processQueueChange(Connection & conn); @@ -405,10 +413,10 @@ private: /* Perform the given build step. Return true if the step is to be retried. */ - bool doBuildStep(nix::ref store, Step::ptr step, + bool doBuildStep(nix::ref destStore, Step::ptr step, Machine::ptr machine); - void buildRemote(nix::ref store, + void buildRemote(nix::ref destStore, Machine::ptr machine, Step::ptr step, unsigned int maxSilentTime, unsigned int buildTimeout, RemoteResult & result);