hydra-queue-runner: Write directly to a binary cache

This commit is contained in:
Eelco Dolstra 2016-02-15 21:10:29 +01:00
parent e46acbf05b
commit 2d0dd7fb49
9 changed files with 463 additions and 34 deletions

View file

@ -2,7 +2,8 @@ bin_PROGRAMS = hydra-queue-runner
hydra_queue_runner_SOURCES = hydra-queue-runner.cc queue-monitor.cc dispatcher.cc \
builder.cc build-result.cc build-remote.cc \
build-result.hh counter.hh pool.hh sync.hh token-server.hh state.hh db.hh
build-result.hh counter.hh pool.hh sync.hh token-server.hh state.hh db.hh \
local-binary-cache.hh local-binary-cache.cc
hydra_queue_runner_LDADD = $(NIX_LIBS) -lpqxx
AM_CXXFLAGS = $(NIX_CFLAGS) -Wall

View file

@ -73,14 +73,14 @@ static void openConnection(Machine::ptr machine, Path tmpDir, int stderrFD, Chil
}
static void copyClosureTo(ref<Store> store,
static void copyClosureTo(ref<Store> destStore,
FdSource & from, FdSink & to, const PathSet & paths,
counter & bytesSent,
bool useSubstitutes = false)
{
PathSet closure;
for (auto & path : paths)
store->computeFSClosure(path, closure);
destStore->computeFSClosure(path, closure);
/* Send the "query valid paths" command with the "lock" option
enabled. This prevents a race where the remote host
@ -95,7 +95,7 @@ static void copyClosureTo(ref<Store> store,
if (present.size() == closure.size()) return;
Paths sorted = store->topoSortPaths(closure);
Paths sorted = destStore->topoSortPaths(closure);
Paths missing;
for (auto i = sorted.rbegin(); i != sorted.rend(); ++i)
@ -104,10 +104,10 @@ static void copyClosureTo(ref<Store> store,
printMsg(lvlDebug, format("sending %1% missing paths") % missing.size());
for (auto & p : missing)
bytesSent += store->queryPathInfo(p).narSize;
bytesSent += destStore->queryPathInfo(p).narSize;
to << cmdImportPaths;
store->exportPaths(missing, false, to);
destStore->exportPaths(missing, false, to);
to.flush();
if (readInt(from) != 1)
@ -115,19 +115,19 @@ static void copyClosureTo(ref<Store> store,
}
static void copyClosureFrom(ref<Store> store,
static void copyClosureFrom(ref<Store> destStore,
FdSource & from, FdSink & to, const PathSet & paths, counter & bytesReceived)
{
to << cmdExportPaths << 0 << paths;
to.flush();
store->importPaths(false, from);
destStore->importPaths(false, from);
for (auto & p : paths)
bytesReceived += store->queryPathInfo(p).narSize;
bytesReceived += destStore->queryPathInfo(p).narSize;
}
void State::buildRemote(ref<Store> store,
void State::buildRemote(ref<Store> destStore,
Machine::ptr machine, Step::ptr step,
unsigned int maxSilentTime, unsigned int buildTimeout,
RemoteResult & result)
@ -222,14 +222,20 @@ void State::buildRemote(ref<Store> store,
}
}
/* Ensure that the inputs exist in the destination store. This is
a no-op for regular stores, but for the binary cache store,
this will copy the inputs to the binary cache from the local
store. */
destStore->buildPaths(basicDrv.inputSrcs);
/* Copy the input closure. */
if (machine->sshName != "localhost") {
if (/* machine->sshName != "localhost" */ true) {
auto mc1 = std::make_shared<MaintainCount>(nrStepsWaiting);
std::lock_guard<std::mutex> sendLock(machine->state->sendLock);
mc1.reset();
MaintainCount mc2(nrStepsCopyingTo);
printMsg(lvlDebug, format("sending closure of %1% to %2%") % step->drvPath % machine->sshName);
copyClosureTo(store, from, to, inputs, bytesSent);
copyClosureTo(destStore, from, to, inputs, bytesSent);
}
autoDelete.cancel();
@ -277,13 +283,13 @@ void State::buildRemote(ref<Store> store,
}
/* Copy the output paths. */
if (machine->sshName != "localhost") {
if (/* machine->sshName != "localhost" */ true) {
printMsg(lvlDebug, format("copying outputs of %1% from %2%") % step->drvPath % machine->sshName);
PathSet outputs;
for (auto & output : step->drv.outputs)
outputs.insert(output.second.path);
MaintainCount mc(nrStepsCopyingFrom);
copyClosureFrom(store, from, to, outputs, bytesReceived);
copyClosureFrom(destStore, from, to, outputs, bytesReceived);
}
/* Shut down the connection. */

View file

@ -41,6 +41,7 @@ BuildOutput getBuildOutput(nix::ref<Store> store, const Derivation & drv)
/* Get build products. */
bool explicitProducts = false;
#if 0
Regex regex(
"(([a-zA-Z0-9_-]+)" // type (e.g. "doc")
"[[:space:]]+"
@ -97,6 +98,7 @@ BuildOutput getBuildOutput(nix::ref<Store> store, const Derivation & drv)
res.products.push_back(product);
}
}
#endif
/* If no build products were explicitly declared, then add all
outputs as a product of type "nix-build". */
@ -108,14 +110,17 @@ BuildOutput getBuildOutput(nix::ref<Store> store, const Derivation & drv)
product.subtype = output.first == "out" ? "" : output.first;
product.name = storePathToName(product.path);
#if 0
struct stat st;
if (stat(product.path.c_str(), &st))
throw SysError(format("getting status of %1%") % product.path);
if (S_ISDIR(st.st_mode))
#endif
res.products.push_back(product);
}
}
#if 0
/* Get the release name from $output/nix-support/hydra-release-name. */
for (auto & output : outputs) {
Path p = output + "/nix-support/hydra-release-name";
@ -139,6 +144,7 @@ BuildOutput getBuildOutput(nix::ref<Store> store, const Derivation & drv)
res.metrics[metric.name] = metric;
}
}
#endif
return res;
}

View file

@ -15,8 +15,8 @@ void State::builder(MachineReservation::ptr reservation)
auto step = reservation->step;
try {
auto store = openStore(); // FIXME: pool
retry = doBuildStep(store, step, reservation->machine);
auto destStore = getDestStore();
retry = doBuildStep(destStore, step, reservation->machine);
} catch (std::exception & e) {
printMsg(lvlError, format("uncaught exception building %1% on %2%: %3%")
% step->drvPath % reservation->machine->sshName % e.what());
@ -45,7 +45,7 @@ void State::builder(MachineReservation::ptr reservation)
}
bool State::doBuildStep(nix::ref<Store> store, Step::ptr step,
bool State::doBuildStep(nix::ref<Store> destStore, Step::ptr step,
Machine::ptr machine)
{
{
@ -120,13 +120,13 @@ bool State::doBuildStep(nix::ref<Store> store, Step::ptr step,
/* Do the build. */
try {
/* FIXME: referring builds may have conflicting timeouts. */
buildRemote(store, machine, step, build->maxSilentTime, build->buildTimeout, result);
buildRemote(destStore, machine, step, build->maxSilentTime, build->buildTimeout, result);
} catch (Error & e) {
result.status = BuildResult::MiscFailure;
result.errorMsg = e.msg();
}
if (result.success()) res = getBuildOutput(store, step->drv);
if (result.success()) res = getBuildOutput(destStore, step->drv);
}
time_t stepStopTime = time(0);

View file

@ -7,6 +7,7 @@
#include "state.hh"
#include "build-result.hh"
#include "local-binary-cache.hh"
#include "shared.hh"
#include "globals.hh"
@ -24,6 +25,18 @@ State::State()
}
ref<Store> State::getLocalStore()
{
return openStore(); // FIXME: pool
}
ref<Store> State::getDestStore()
{
return make_ref<LocalBinaryCache>(getLocalStore(), "/tmp/binary-cache");
}
void State::parseMachines(const std::string & contents)
{
Machines newMachines, oldMachines;
@ -94,7 +107,8 @@ void State::monitorMachinesFile()
getEnv("NIX_REMOTE_SYSTEMS", pathExists(defaultMachinesFile) ? defaultMachinesFile : ""), ":");
if (machinesFiles.empty()) {
parseMachines("localhost " + settings.thisSystem
parseMachines("localhost " +
(settings.thisSystem == "x86_64-linux" ? "x86_64-linux,i686-linux" : settings.thisSystem)
+ " - " + std::to_string(settings.maxBuildJobs) + " 1");
return;
}

View file

@ -0,0 +1,267 @@
#include "local-binary-cache.hh"
#include "archive.hh"
#include "derivations.hh"
#include "globals.hh"
#include "worker-protocol.hh"
namespace nix {
LocalBinaryCache::LocalBinaryCache(ref<Store> localStore, const Path & binaryCacheDir)
: localStore(localStore), binaryCacheDir(binaryCacheDir)
{
createDirs(binaryCacheDir + "/nar");
}
Path LocalBinaryCache::narInfoFileFor(const Path & storePath)
{
assertStorePath(storePath);
return binaryCacheDir + "/" + storePathToHash(storePath) + ".narinfo";
}
void atomicWrite(const Path & path, const std::string & s)
{
Path tmp = path + ".tmp." + std::to_string(getpid());
AutoDelete del(tmp, false);
writeFile(tmp, s);
if (rename(tmp.c_str(), path.c_str()))
throw SysError(format("renaming %1% to %2%") % tmp % path);
del.cancel();
}
void LocalBinaryCache::addToCache(const ValidPathInfo & info,
const string & nar)
{
size_t narSize = nar.size();
Hash narHash = hashString(htSHA256, nar);
if (info.hash.type != htUnknown && info.hash != narHash)
throw Error(format("refusing to copy corrupted path %1% to binary cache") % info.path);
printMsg(lvlTalkative, format("copying path %1% (%2% bytes) to binary cache")
% info.path % narSize);
/* Atomically write the NAR file. */
string narFileRel = "nar/" + printHash(narHash) + ".nar";
Path narFile = binaryCacheDir + "/" + narFileRel;
if (!pathExists(narFile)) atomicWrite(narFile, nar);
/* Atomically write the NAR info file.*/
Path narInfoFile = narInfoFileFor(info.path);
if (!pathExists(narInfoFile)) {
Strings refs;
for (auto & r : info.references)
refs.push_back(baseNameOf(r));
std::string narInfo;
narInfo += "StorePath: " + info.path + "\n";
narInfo += "URL: " + narFileRel + "\n";
narInfo += "Compression: none\n";
narInfo += "FileHash: sha256:" + printHash(narHash) + "\n";
narInfo += "FileSize: " + std::to_string(narSize) + "\n";
narInfo += "NarHash: sha256:" + printHash(narHash) + "\n";
narInfo += "NarSize: " + std::to_string(narSize) + "\n";
narInfo += "References: " + concatStringsSep(" ", refs) + "\n";
// FIXME: add signature
atomicWrite(narInfoFile, narInfo);
}
}
LocalBinaryCache::NarInfo LocalBinaryCache::readNarInfo(const Path & storePath)
{
NarInfo res;
Path narInfoFile = narInfoFileFor(storePath);
if (!pathExists(narInfoFile))
abort();
std::string narInfo = readFile(narInfoFile);
auto corrupt = [&]() {
throw Error(format("corrupt NAR info file %1%") % narInfoFile);
};
size_t pos = 0;
while (pos < narInfo.size()) {
size_t colon = narInfo.find(':', pos);
if (colon == std::string::npos) corrupt();
std::string name(narInfo, pos, colon - pos);
size_t eol = narInfo.find('\n', colon + 2);
if (eol == std::string::npos) corrupt();
std::string value(narInfo, colon + 2, eol - colon - 2);
if (name == "StorePath") {
res.info.path = value;
if (value != storePath) corrupt();
res.info.path = value;
}
else if (name == "References") {
auto refs = tokenizeString<Strings>(value, " ");
if (!res.info.references.empty()) corrupt();
for (auto & r : refs)
res.info.references.insert(settings.nixStore + "/" + r);
}
else if (name == "URL") {
res.narUrl = value;
}
pos = eol + 1;
}
if (res.info.path.empty() || res.narUrl.empty()) corrupt();
return res;
}
bool LocalBinaryCache::isValidPath(const Path & storePath)
{
Path narInfoFile = narInfoFileFor(storePath);
printMsg(lvlDebug, format("checking %1% -> %2%") % storePath % narInfoFile);
return pathExists(narInfoFile);
}
void LocalBinaryCache::exportPath(const Path & storePath, bool sign, Sink & sink)
{
assert(!sign);
auto res = readNarInfo(storePath);
auto nar = readFile(binaryCacheDir + "/" + res.narUrl);
printMsg(lvlTalkative, format("exporting path %1% (%2% bytes)") % storePath % nar.size());
assert(nar.size() % 8 == 0);
sink((unsigned char *) nar.c_str(), nar.size());
// FIXME: check integrity of NAR.
sink << exportMagic << storePath << res.info.references << res.info.deriver << 0;
}
Paths LocalBinaryCache::importPaths(bool requireSignature, Source & source)
{
assert(!requireSignature);
Paths res;
while (true) {
unsigned long long n = readLongLong(source);
if (n == 0) break;
if (n != 1) throw Error("input doesn't look like something created by nix-store --export");
res.push_back(importPath(source));
}
return res;
}
struct TeeSource : Source
{
Source & readSource;
std::string data;
TeeSource(Source & readSource) : readSource(readSource)
{
}
size_t read(unsigned char * data, size_t len)
{
size_t n = readSource.read(data, len);
this->data.append((char *) data, n);
return n;
}
};
struct NopSink : ParseSink
{
};
Path LocalBinaryCache::importPath(Source & source)
{
/* FIXME: some cut&paste of LocalStore::importPath(). */
/* Extract the NAR from the source. */
TeeSource tee(source);
NopSink sink;
parseDump(sink, tee);
uint32_t magic = readInt(source);
if (magic != exportMagic)
throw Error("Nix archive cannot be imported; wrong format");
ValidPathInfo info;
info.path = readStorePath(source);
info.references = readStorePaths<PathSet>(source);
readString(source); // deriver, don't care
bool haveSignature = readInt(source) == 1;
assert(!haveSignature);
addToCache(info, tee.data);
return info.path;
}
ValidPathInfo LocalBinaryCache::queryPathInfo(const Path & storePath)
{
return readNarInfo(storePath).info;
}
void LocalBinaryCache::querySubstitutablePathInfos(const PathSet & paths,
SubstitutablePathInfos & infos)
{
PathSet left;
for (auto & storePath : paths) {
if (!localStore->isValidPath(storePath)) {
left.insert(storePath);
continue;
}
ValidPathInfo info = localStore->queryPathInfo(storePath);
SubstitutablePathInfo sub;
sub.references = info.references;
sub.downloadSize = 0;
sub.narSize = info.narSize;
infos.emplace(storePath, sub);
}
localStore->querySubstitutablePathInfos(left, infos);
}
void LocalBinaryCache::buildPaths(const PathSet & paths, BuildMode buildMode)
{
for (auto & storePath : paths) {
assert(!isDerivation(storePath));
if (isValidPath(storePath)) continue;
localStore->addTempRoot(storePath);
if (!localStore->isValidPath(storePath))
localStore->ensurePath(storePath);
ValidPathInfo info = localStore->queryPathInfo(storePath);
for (auto & ref : info.references)
if (ref != storePath)
ensurePath(ref);
StringSink sink;
dumpPath(storePath, sink);
addToCache(info, sink.s);
}
}
void LocalBinaryCache::ensurePath(const Path & path)
{
buildPaths({path});
}
}

View file

@ -0,0 +1,124 @@
#pragma once
#include "store-api.hh"
namespace nix {
class LocalBinaryCache : public nix::Store
{
private:
ref<Store> localStore;
Path binaryCacheDir;
public:
LocalBinaryCache(ref<Store> localStore, const Path & binaryCacheDir);
private:
Path narInfoFileFor(const Path & storePath);
void addToCache(const ValidPathInfo & info, const string & nar);
struct NarInfo
{
ValidPathInfo info;
std::string narUrl;
};
NarInfo readNarInfo(const Path & storePath);
public:
bool isValidPath(const Path & path) override;
PathSet queryValidPaths(const PathSet & paths) override
{ abort(); }
PathSet queryAllValidPaths() override
{ abort(); }
ValidPathInfo queryPathInfo(const Path & path) override;
Hash queryPathHash(const Path & path) override
{ abort(); }
void queryReferrers(const Path & path,
PathSet & referrers) override
{ abort(); }
Path queryDeriver(const Path & path) override
{ abort(); }
PathSet queryValidDerivers(const Path & path) override
{ abort(); }
PathSet queryDerivationOutputs(const Path & path) override
{ abort(); }
StringSet queryDerivationOutputNames(const Path & path) override
{ abort(); }
Path queryPathFromHashPart(const string & hashPart) override
{ abort(); }
PathSet querySubstitutablePaths(const PathSet & paths) override
{ abort(); }
void querySubstitutablePathInfos(const PathSet & paths,
SubstitutablePathInfos & infos) override;
Path addToStore(const string & name, const Path & srcPath,
bool recursive = true, HashType hashAlgo = htSHA256,
PathFilter & filter = defaultPathFilter, bool repair = false) override
{ abort(); }
Path addTextToStore(const string & name, const string & s,
const PathSet & references, bool repair = false) override
{ abort(); }
void exportPath(const Path & path, bool sign,
Sink & sink) override;
Paths importPaths(bool requireSignature, Source & source) override;
Path importPath(Source & source);
void buildPaths(const PathSet & paths, BuildMode buildMode = bmNormal) override;
BuildResult buildDerivation(const Path & drvPath, const BasicDerivation & drv,
BuildMode buildMode = bmNormal) override
{ abort(); }
void ensurePath(const Path & path) override;
void addTempRoot(const Path & path) override
{ abort(); }
void addIndirectRoot(const Path & path) override
{ abort(); }
void syncWithGC() override
{ }
Roots findRoots() override
{ abort(); }
void collectGarbage(const GCOptions & options, GCResults & results) override
{ abort(); }
PathSet queryFailedPaths() override
{ return PathSet(); }
void clearFailedPaths(const PathSet & paths) override
{ }
void optimiseStore() override
{ }
bool verifyStore(bool checkContents, bool repair) override
{ return true; }
};
}

View file

@ -30,12 +30,13 @@ void State::queueMonitorLoop()
receiver buildsBumped(*conn, "builds_bumped");
receiver jobsetSharesChanged(*conn, "jobset_shares_changed");
auto store = openStore(); // FIXME: pool
auto localStore = getLocalStore();
auto destStore = getDestStore();
unsigned int lastBuildId = 0;
while (true) {
bool done = getQueuedBuilds(*conn, store, lastBuildId);
bool done = getQueuedBuilds(*conn, localStore, destStore, lastBuildId);
/* Sleep until we get notification from the database about an
event. */
@ -63,7 +64,8 @@ void State::queueMonitorLoop()
}
bool State::getQueuedBuilds(Connection & conn, ref<Store> store, unsigned int & lastBuildId)
bool State::getQueuedBuilds(Connection & conn, ref<Store> localStore,
ref<Store> destStore, unsigned int & lastBuildId)
{
printMsg(lvlInfo, format("checking the queue for builds > %1%...") % lastBuildId);
@ -118,7 +120,7 @@ bool State::getQueuedBuilds(Connection & conn, ref<Store> store, unsigned int &
nrAdded++;
newBuildsByID.erase(build->id);
if (!store->isValidPath(build->drvPath)) {
if (!localStore->isValidPath(build->drvPath)) {
/* Derivation has been GC'ed prematurely. */
printMsg(lvlError, format("aborting GC'ed build %1%") % build->id);
if (!build->finishedInDB) {
@ -138,7 +140,8 @@ bool State::getQueuedBuilds(Connection & conn, ref<Store> store, unsigned int &
std::set<Step::ptr> newSteps;
std::set<Path> finishedDrvs; // FIXME: re-use?
Step::ptr step = createStep(store, conn, build, build->drvPath, build, 0, finishedDrvs, newSteps, newRunnable);
Step::ptr step = createStep(destStore, conn, build, build->drvPath,
build, 0, finishedDrvs, newSteps, newRunnable);
/* Some of the new steps may be the top level of builds that
we haven't processed yet. So do them now. This ensures that
@ -156,7 +159,7 @@ bool State::getQueuedBuilds(Connection & conn, ref<Store> store, unsigned int &
all valid. So we mark this as a finished, cached build. */
if (!step) {
Derivation drv = readDerivation(build->drvPath);
BuildOutput res = getBuildOutput(store, drv);
BuildOutput res = getBuildOutput(destStore, drv);
pqxx::work txn(conn);
time_t now = time(0);
@ -314,7 +317,7 @@ void State::processQueueChange(Connection & conn)
}
Step::ptr State::createStep(ref<Store> store,
Step::ptr State::createStep(ref<Store> destStore,
Connection & conn, Build::ptr build, const Path & drvPath,
Build::ptr referringBuild, Step::ptr referringStep, std::set<Path> & finishedDrvs,
std::set<Step::ptr> & newSteps, std::set<Step::ptr> & newRunnable)
@ -394,7 +397,7 @@ Step::ptr State::createStep(ref<Store> store,
DerivationOutputs missing;
PathSet missingPaths;
for (auto & i : step->drv.outputs)
if (!store->isValidPath(i.second.path)) {
if (!destStore->isValidPath(i.second.path)) {
valid = false;
missing[i.first] = i.second;
missingPaths.insert(i.second.path);
@ -406,7 +409,7 @@ Step::ptr State::createStep(ref<Store> store,
assert(missing.size() == missingPaths.size());
if (!missing.empty() && settings.useSubstitutes) {
SubstitutablePathInfos infos;
store->querySubstitutablePathInfos(missingPaths, infos);
destStore->querySubstitutablePathInfos(missingPaths, infos); // FIXME
if (infos.size() == missingPaths.size()) {
valid = true;
for (auto & i : missing) {
@ -414,7 +417,7 @@ Step::ptr State::createStep(ref<Store> store,
printMsg(lvlInfo, format("substituting output %1% of %2%") % i.second.path % drvPath);
time_t startTime = time(0);
store->ensurePath(i.second.path);
destStore->ensurePath(i.second.path);
time_t stopTime = time(0);
{
@ -443,7 +446,7 @@ Step::ptr State::createStep(ref<Store> store,
/* Create steps for the dependencies. */
for (auto & i : step->drv.inputDrvs) {
auto dep = createStep(store, conn, build, i.first, 0, step, finishedDrvs, newSteps, newRunnable);
auto dep = createStep(destStore, conn, build, i.first, 0, step, finishedDrvs, newSteps, newRunnable);
if (dep) {
auto step_(step->state.lock());
step_->deps.insert(dep);

View file

@ -350,6 +350,13 @@ public:
private:
/* Return a store object that can access derivations produced by
hydra-evaluator. */
nix::ref<nix::Store> getLocalStore();
/* Return a store object to store build results. */
nix::ref<nix::Store> getDestStore();
void clearBusy(Connection & conn, time_t stopTime);
void parseMachines(const std::string & contents);
@ -377,7 +384,8 @@ private:
void queueMonitorLoop();
/* Check the queue for new builds. */
bool getQueuedBuilds(Connection & conn, nix::ref<nix::Store> store, unsigned int & lastBuildId);
bool getQueuedBuilds(Connection & conn, nix::ref<nix::Store> localStore,
nix::ref<nix::Store> destStore, unsigned int & lastBuildId);
/* Handle cancellation, deletion and priority bumps. */
void processQueueChange(Connection & conn);
@ -405,10 +413,10 @@ private:
/* Perform the given build step. Return true if the step is to be
retried. */
bool doBuildStep(nix::ref<nix::Store> store, Step::ptr step,
bool doBuildStep(nix::ref<nix::Store> destStore, Step::ptr step,
Machine::ptr machine);
void buildRemote(nix::ref<nix::Store> store,
void buildRemote(nix::ref<nix::Store> destStore,
Machine::ptr machine, Step::ptr step,
unsigned int maxSilentTime, unsigned int buildTimeout,
RemoteResult & result);