diff --git a/hydra-module.nix b/hydra-module.nix index 30082e0f..059769fa 100644 --- a/hydra-module.nix +++ b/hydra-module.nix @@ -291,11 +291,14 @@ in IN_SYSTEMD = "1"; # to get log severity levels }; serviceConfig = - { ExecStartPre = "${cfg.package}/bin/hydra-queue-runner --unlock"; - ExecStart = "@${cfg.package}/bin/hydra-queue-runner hydra-queue-runner -v"; + { ExecStart = "@${cfg.package}/bin/hydra-queue-runner hydra-queue-runner -v --option build-use-substitutes false"; ExecStopPost = "${cfg.package}/bin/hydra-queue-runner --unlock"; User = "hydra-queue-runner"; Restart = "always"; + + # Ensure we can get core dumps. + LimitCORE = "infinity"; + WorkingDirectory = "${baseDir}/queue-runner"; }; }; diff --git a/release.nix b/release.nix index 70f3505b..e482b2a1 100644 --- a/release.nix +++ b/release.nix @@ -159,6 +159,10 @@ rec { guile # optional, for Guile + Guix support perlDeps perl postgresql92 # for running the tests + (aws-sdk-cpp.override { + apis = ["s3"]; + customMemoryManagement = false; + }) ]; hydraPath = lib.makeSearchPath "bin" ( diff --git a/src/hydra-queue-runner/Makefile.am b/src/hydra-queue-runner/Makefile.am index 089187d1..12b7e01e 100644 --- a/src/hydra-queue-runner/Makefile.am +++ b/src/hydra-queue-runner/Makefile.am @@ -2,7 +2,9 @@ bin_PROGRAMS = hydra-queue-runner hydra_queue_runner_SOURCES = hydra-queue-runner.cc queue-monitor.cc dispatcher.cc \ builder.cc build-result.cc build-remote.cc \ - build-result.hh counter.hh pool.hh sync.hh token-server.hh state.hh db.hh + build-result.hh counter.hh token-server.hh state.hh db.hh \ + s3-binary-cache-store.hh s3-binary-cache-store.cc \ + finally.hh hydra_queue_runner_LDADD = $(NIX_LIBS) -lpqxx -AM_CXXFLAGS = $(NIX_CFLAGS) -Wall +AM_CXXFLAGS = $(NIX_CFLAGS) -Wall -laws-cpp-sdk-s3 diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc index 787bf43f..ef3ca2e3 100644 --- a/src/hydra-queue-runner/build-remote.cc +++ b/src/hydra-queue-runner/build-remote.cc @@ -8,6 +8,7 @@ #include "state.hh" #include "util.hh" #include "worker-protocol.hh" +#include "finally.hh" using namespace nix; @@ -73,19 +74,19 @@ static void openConnection(Machine::ptr machine, Path tmpDir, int stderrFD, Chil } -static void copyClosureTo(ref store, +static void copyClosureTo(ref destStore, FdSource & from, FdSink & to, const PathSet & paths, - counter & bytesSent, bool useSubstitutes = false) { PathSet closure; for (auto & path : paths) - store->computeFSClosure(path, closure); + destStore->computeFSClosure(path, closure); /* Send the "query valid paths" command with the "lock" option enabled. This prevents a race where the remote host garbage-collect paths that are already there. Optionally, ask the remote host to substitute missing paths. */ + // FIXME: substitute output pollutes our build log to << cmdQueryValidPaths << 1 << useSubstitutes << closure; to.flush(); @@ -95,7 +96,7 @@ static void copyClosureTo(ref store, if (present.size() == closure.size()) return; - Paths sorted = store->topoSortPaths(closure); + Paths sorted = destStore->topoSortPaths(closure); Paths missing; for (auto i = sorted.rbegin(); i != sorted.rend(); ++i) @@ -103,11 +104,8 @@ static void copyClosureTo(ref store, printMsg(lvlDebug, format("sending %1% missing paths") % missing.size()); - for (auto & p : missing) - bytesSent += store->queryPathInfo(p).narSize; - to << cmdImportPaths; - store->exportPaths(missing, false, to); + destStore->exportPaths(missing, false, to); to.flush(); if (readInt(from) != 1) @@ -115,19 +113,7 @@ static void copyClosureTo(ref store, } -static void copyClosureFrom(ref store, - FdSource & from, FdSink & to, const PathSet & paths, counter & bytesReceived) -{ - to << cmdExportPaths << 0 << paths; - to.flush(); - store->importPaths(false, from); - - for (auto & p : paths) - bytesReceived += store->queryPathInfo(p).narSize; -} - - -void State::buildRemote(ref store, +void State::buildRemote(ref destStore, Machine::ptr machine, Step::ptr step, unsigned int maxSilentTime, unsigned int buildTimeout, RemoteResult & result) @@ -152,6 +138,11 @@ void State::buildRemote(ref store, FdSource from(child.from); FdSink to(child.to); + Finally updateStats([&]() { + bytesReceived += from.read; + bytesSent += to.written; + }); + /* Handshake. */ bool sendDerivation = true; unsigned int remoteVersion; @@ -222,8 +213,14 @@ void State::buildRemote(ref store, } } + /* Ensure that the inputs exist in the destination store. This is + a no-op for regular stores, but for the binary cache store, + this will copy the inputs to the binary cache from the local + store. */ + destStore->buildPaths(basicDrv.inputSrcs); + /* Copy the input closure. */ - if (machine->sshName != "localhost") { + if (/* machine->sshName != "localhost" */ true) { auto mc1 = std::make_shared(nrStepsWaiting); std::lock_guard sendLock(machine->state->sendLock); mc1.reset(); @@ -232,7 +229,7 @@ void State::buildRemote(ref store, auto now1 = std::chrono::steady_clock::now(); - copyClosureTo(store, from, to, inputs, bytesSent); + copyClosureTo(destStore, from, to, inputs, true); auto now2 = std::chrono::steady_clock::now(); @@ -279,21 +276,26 @@ void State::buildRemote(ref store, /* If the path was substituted or already valid, then we didn't get a build log. */ if (result.status == BuildResult::Substituted || result.status == BuildResult::AlreadyValid) { + printMsg(lvlInfo, format("outputs of ‘%1%’ substituted or already valid on ‘%2%’") % step->drvPath % machine->sshName); unlink(result.logFile.c_str()); result.logFile = ""; } /* Copy the output paths. */ - if (machine->sshName != "localhost") { + if (/* machine->sshName != "localhost" */ true) { printMsg(lvlDebug, format("copying outputs of ‘%1%’ from ‘%2%’") % step->drvPath % machine->sshName); PathSet outputs; for (auto & output : step->drv.outputs) outputs.insert(output.second.path); MaintainCount mc(nrStepsCopyingFrom); + result.accessor = destStore->getFSAccessor(); + auto now1 = std::chrono::steady_clock::now(); - copyClosureFrom(store, from, to, outputs, bytesReceived); + to << cmdExportPaths << 0 << outputs; + to.flush(); + destStore->importPaths(false, from, result.accessor); auto now2 = std::chrono::steady_clock::now(); diff --git a/src/hydra-queue-runner/build-result.cc b/src/hydra-queue-runner/build-result.cc index ebf07035..6b2741ea 100644 --- a/src/hydra-queue-runner/build-result.cc +++ b/src/hydra-queue-runner/build-result.cc @@ -2,26 +2,13 @@ #include "store-api.hh" #include "util.hh" #include "regex.hh" +#include "fs-accessor.hh" using namespace nix; -static std::tuple secureRead(Path fileName) -{ - auto fail = std::make_tuple(false, ""); - - if (!pathExists(fileName)) return fail; - - try { - /* For security, resolve symlinks. */ - fileName = canonPath(fileName, true); - if (!isInStore(fileName)) return fail; - return std::make_tuple(true, readFile(fileName)); - } catch (Error & e) { return fail; } -} - - -BuildOutput getBuildOutput(nix::ref store, const Derivation & drv) +BuildOutput getBuildOutput(nix::ref store, + nix::ref accessor, const Derivation & drv) { BuildOutput res; @@ -52,14 +39,16 @@ BuildOutput getBuildOutput(nix::ref store, const Derivation & drv) for (auto & output : outputs) { Path failedFile = output + "/nix-support/failed"; - if (pathExists(failedFile)) res.failed = true; + if (accessor->stat(failedFile).type == FSAccessor::Type::tRegular) + res.failed = true; - auto file = secureRead(output + "/nix-support/hydra-build-products"); - if (!std::get<0>(file)) continue; + Path productsFile = output + "/nix-support/hydra-build-products"; + if (accessor->stat(productsFile).type != FSAccessor::Type::tRegular) + continue; explicitProducts = true; - for (auto & line : tokenizeString(std::get<1>(file), "\n")) { + for (auto & line : tokenizeString(accessor->readFile(productsFile), "\n")) { BuildProduct product; Regex::Subs subs; @@ -72,26 +61,23 @@ BuildOutput getBuildOutput(nix::ref store, const Derivation & drv) /* Ensure that the path exists and points into the Nix store. */ + // FIXME: should we disallow products referring to other + // store paths, or that are outside the input closure? if (product.path == "" || product.path[0] != '/') continue; - try { - product.path = canonPath(product.path, true); - } catch (Error & e) { continue; } - if (!isInStore(product.path) || !pathExists(product.path)) continue; + product.path = canonPath(product.path); + if (!isInStore(product.path)) continue; - /* FIXME: check that the path is in the input closure - of the build? */ + auto st = accessor->stat(product.path); + if (st.type == FSAccessor::Type::tMissing) continue; product.name = product.path == output ? "" : baseNameOf(product.path); - struct stat st; - if (stat(product.path.c_str(), &st)) - throw SysError(format("getting status of ‘%1%’") % product.path); - - if (S_ISREG(st.st_mode)) { + if (st.type == FSAccessor::Type::tRegular) { product.isRegular = true; - product.fileSize = st.st_size; - product.sha1hash = hashFile(htSHA1, product.path); - product.sha256hash = hashFile(htSHA256, product.path); + product.fileSize = st.fileSize; + auto contents = accessor->readFile(product.path); + product.sha1hash = hashString(htSHA1, contents); + product.sha256hash = hashString(htSHA256, contents); } res.products.push_back(product); @@ -108,10 +94,10 @@ BuildOutput getBuildOutput(nix::ref store, const Derivation & drv) product.subtype = output.first == "out" ? "" : output.first; product.name = storePathToName(product.path); - struct stat st; - if (stat(product.path.c_str(), &st)) - throw SysError(format("getting status of ‘%1%’") % product.path); - if (S_ISDIR(st.st_mode)) + auto st = accessor->stat(product.path); + if (st.type == FSAccessor::Type::tMissing) + throw Error(format("getting status of ‘%1%’") % product.path); + if (st.type == FSAccessor::Type::tDirectory) res.products.push_back(product); } } @@ -119,17 +105,18 @@ BuildOutput getBuildOutput(nix::ref store, const Derivation & drv) /* Get the release name from $output/nix-support/hydra-release-name. */ for (auto & output : outputs) { Path p = output + "/nix-support/hydra-release-name"; - if (!pathExists(p)) continue; + if (accessor->stat(p).type != FSAccessor::Type::tRegular) continue; try { - res.releaseName = trim(readFile(p)); + res.releaseName = trim(accessor->readFile(p)); } catch (Error & e) { continue; } // FIXME: validate release name } /* Get metrics. */ for (auto & output : outputs) { - auto file = secureRead(output + "/nix-support/hydra-metrics"); - for (auto & line : tokenizeString(std::get<1>(file), "\n")) { + Path metricsFile = output + "/nix-support/hydra-metrics"; + if (accessor->stat(metricsFile).type != FSAccessor::Type::tRegular) continue; + for (auto & line : tokenizeString(accessor->readFile(metricsFile), "\n")) { auto fields = tokenizeString>(line); if (fields.size() < 2) continue; BuildMetric metric; diff --git a/src/hydra-queue-runner/build-result.hh b/src/hydra-queue-runner/build-result.hh index 7fdd659b..72e8b4df 100644 --- a/src/hydra-queue-runner/build-result.hh +++ b/src/hydra-queue-runner/build-result.hh @@ -4,6 +4,7 @@ #include "hash.hh" #include "derivations.hh" +#include "store-api.hh" struct BuildProduct { @@ -37,4 +38,5 @@ struct BuildOutput std::map metrics; }; -BuildOutput getBuildOutput(nix::ref store, const nix::Derivation & drv); +BuildOutput getBuildOutput(nix::ref store, + nix::ref accessor, const nix::Derivation & drv); diff --git a/src/hydra-queue-runner/builder.cc b/src/hydra-queue-runner/builder.cc index 557ccde9..9179fe42 100644 --- a/src/hydra-queue-runner/builder.cc +++ b/src/hydra-queue-runner/builder.cc @@ -15,8 +15,8 @@ void State::builder(MachineReservation::ptr reservation) auto step = reservation->step; try { - auto store = openStore(); // FIXME: pool - retry = doBuildStep(store, step, reservation->machine); + auto destStore = getDestStore(); + retry = doBuildStep(destStore, step, reservation->machine); } catch (std::exception & e) { printMsg(lvlError, format("uncaught exception building ‘%1%’ on ‘%2%’: %3%") % step->drvPath % reservation->machine->sshName % e.what()); @@ -45,7 +45,7 @@ void State::builder(MachineReservation::ptr reservation) } -bool State::doBuildStep(nix::ref store, Step::ptr step, +bool State::doBuildStep(nix::ref destStore, Step::ptr step, Machine::ptr machine) { { @@ -112,6 +112,7 @@ bool State::doBuildStep(nix::ref store, Step::ptr step, /* Create a build step record indicating that we started building. */ { + auto mc = startDbUpdate(); pqxx::work txn(*conn); stepNr = createBuildStep(txn, result.startTime, build, step, machine->sshName, bssBusy); txn.commit(); @@ -120,13 +121,14 @@ bool State::doBuildStep(nix::ref store, Step::ptr step, /* Do the build. */ try { /* FIXME: referring builds may have conflicting timeouts. */ - buildRemote(store, machine, step, build->maxSilentTime, build->buildTimeout, result); + buildRemote(destStore, machine, step, build->maxSilentTime, build->buildTimeout, result); } catch (Error & e) { result.status = BuildResult::MiscFailure; result.errorMsg = e.msg(); } - if (result.success()) res = getBuildOutput(store, step->drv); + if (result.success()) + res = getBuildOutput(destStore, ref(result.accessor), step->drv); } time_t stepStopTime = time(0); @@ -164,6 +166,7 @@ bool State::doBuildStep(nix::ref store, Step::ptr step, retry = step_->tries + 1 < maxTries; } if (retry) { + auto mc = startDbUpdate(); pqxx::work txn(*conn); finishBuildStep(txn, result.startTime, result.stopTime, result.overhead, build->id, stepNr, machine->sshName, bssAborted, result.errorMsg); @@ -212,6 +215,8 @@ bool State::doBuildStep(nix::ref store, Step::ptr step, /* Update the database. */ { + auto mc = startDbUpdate(); + pqxx::work txn(*conn); finishBuildStep(txn, result.startTime, result.stopTime, result.overhead, @@ -298,6 +303,8 @@ bool State::doBuildStep(nix::ref store, Step::ptr step, /* Update the database. */ { + auto mc = startDbUpdate(); + pqxx::work txn(*conn); BuildStatus buildStatus = diff --git a/src/hydra-queue-runner/counter.hh b/src/hydra-queue-runner/counter.hh index 1943d1c3..6afff99d 100644 --- a/src/hydra-queue-runner/counter.hh +++ b/src/hydra-queue-runner/counter.hh @@ -1,6 +1,7 @@ #pragma once #include +#include typedef std::atomic counter; @@ -8,5 +9,9 @@ struct MaintainCount { counter & c; MaintainCount(counter & c) : c(c) { c++; } + MaintainCount(counter & c, std::function warn) : c(c) + { + warn(++c); + } ~MaintainCount() { auto prev = c--; assert(prev); } }; diff --git a/src/hydra-queue-runner/finally.hh b/src/hydra-queue-runner/finally.hh new file mode 100644 index 00000000..47c64dea --- /dev/null +++ b/src/hydra-queue-runner/finally.hh @@ -0,0 +1,12 @@ +#pragma once + +/* A trivial class to run a function at the end of a scope. */ +class Finally +{ +private: + std::function fun; + +public: + Finally(std::function fun) : fun(fun) { } + ~Finally() { fun(); } +}; diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index 11709c31..d7e80f82 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -7,6 +7,8 @@ #include "state.hh" #include "build-result.hh" +#include "local-binary-cache-store.hh" +#include "s3-binary-cache-store.hh" #include "shared.hh" #include "globals.hh" @@ -20,10 +22,51 @@ State::State() hydraData = getEnv("HYDRA_DATA"); if (hydraData == "") throw Error("$HYDRA_DATA must be set"); + /* Read hydra.conf. */ + auto hydraConfigFile = getEnv("HYDRA_CONFIG"); + if (pathExists(hydraConfigFile)) { + + for (auto line : tokenizeString(readFile(hydraConfigFile), "\n")) { + line = trim(string(line, 0, line.find('#'))); + + auto eq = line.find('='); + if (eq == std::string::npos) continue; + + auto key = trim(std::string(line, 0, eq)); + auto value = trim(std::string(line, eq + 1)); + + if (key == "") continue; + + hydraConfig[key] = value; + } + } + logDir = canonPath(hydraData + "/build-logs"); } +MaintainCount State::startDbUpdate() +{ + return MaintainCount(nrActiveDbUpdates, [](unsigned long c) { + if (c > 6) { + printMsg(lvlError, format("warning: %d concurrent database updates; PostgreSQL may be stalled") % c); + } + }); +} + + +ref State::getLocalStore() +{ + return ref(_localStore); +} + + +ref State::getDestStore() +{ + return ref(_destStore); +} + + void State::parseMachines(const std::string & contents) { Machines newMachines, oldMachines; @@ -94,7 +137,8 @@ void State::monitorMachinesFile() getEnv("NIX_REMOTE_SYSTEMS", pathExists(defaultMachinesFile) ? defaultMachinesFile : ""), ":"); if (machinesFiles.empty()) { - parseMachines("localhost " + settings.thisSystem + parseMachines("localhost " + + (settings.thisSystem == "x86_64-linux" ? "x86_64-linux,i686-linux" : settings.thisSystem) + " - " + std::to_string(settings.maxBuildJobs) + " 1"); return; } @@ -502,8 +546,8 @@ void State::dumpStatus(Connection & conn, bool log) root.attr("nrStepsCopyingTo", nrStepsCopyingTo); root.attr("nrStepsCopyingFrom", nrStepsCopyingFrom); root.attr("nrStepsWaiting", nrStepsWaiting); - root.attr("bytesSent"); out << bytesSent; - root.attr("bytesReceived"); out << bytesReceived; + root.attr("bytesSent", bytesSent); + root.attr("bytesReceived", bytesReceived); root.attr("nrBuildsRead", nrBuildsRead); root.attr("nrBuildsDone", nrBuildsDone); root.attr("nrStepsDone", nrStepsDone); @@ -512,12 +556,13 @@ void State::dumpStatus(Connection & conn, bool log) if (nrStepsDone) { root.attr("totalStepTime", totalStepTime); root.attr("totalStepBuildTime", totalStepBuildTime); - root.attr("avgStepTime"); out << (float) totalStepTime / nrStepsDone; - root.attr("avgStepBuildTime"); out << (float) totalStepBuildTime / nrStepsDone; + root.attr("avgStepTime", (float) totalStepTime / nrStepsDone); + root.attr("avgStepBuildTime", (float) totalStepBuildTime / nrStepsDone); } root.attr("nrQueueWakeups", nrQueueWakeups); root.attr("nrDispatcherWakeups", nrDispatcherWakeups); root.attr("nrDbConnections", dbPool.count()); + root.attr("nrActiveDbUpdates", nrActiveDbUpdates); { root.attr("machines"); JSONObject nested(out); @@ -535,8 +580,8 @@ void State::dumpStatus(Connection & conn, bool log) if (m->state->nrStepsDone) { nested2.attr("totalStepTime", s->totalStepTime); nested2.attr("totalStepBuildTime", s->totalStepBuildTime); - nested2.attr("avgStepTime"); out << (float) s->totalStepTime / s->nrStepsDone; - nested2.attr("avgStepBuildTime"); out << (float) s->totalStepBuildTime / s->nrStepsDone; + nested2.attr("avgStepTime", (float) s->totalStepTime / s->nrStepsDone); + nested2.attr("avgStepBuildTime", (float) s->totalStepBuildTime / s->nrStepsDone); } } } @@ -547,7 +592,7 @@ void State::dumpStatus(Connection & conn, bool log) for (auto & jobset : *jobsets_) { nested.attr(jobset.first.first + ":" + jobset.first.second); JSONObject nested2(out); - nested2.attr("shareUsed"); out << jobset.second->shareUsed(); + nested2.attr("shareUsed", jobset.second->shareUsed()); nested2.attr("seconds", jobset.second->getSeconds()); } } @@ -567,11 +612,67 @@ void State::dumpStatus(Connection & conn, bool log) nested2.attr("lastActive", std::chrono::system_clock::to_time_t(i.second.lastActive)); } } + + auto store = dynamic_cast(&*getDestStore()); + + if (store) { + root.attr("store"); + JSONObject nested(out); + + auto & stats = store->getStats(); + nested.attr("narInfoRead", stats.narInfoRead); + nested.attr("narInfoReadAverted", stats.narInfoReadAverted); + nested.attr("narInfoWrite", stats.narInfoWrite); + nested.attr("narInfoCacheSize", stats.narInfoCacheSize); + nested.attr("narRead", stats.narRead); + nested.attr("narReadBytes", stats.narReadBytes); + nested.attr("narReadCompressedBytes", stats.narReadCompressedBytes); + nested.attr("narWrite", stats.narWrite); + nested.attr("narWriteAverted", stats.narWriteAverted); + nested.attr("narWriteBytes", stats.narWriteBytes); + nested.attr("narWriteCompressedBytes", stats.narWriteCompressedBytes); + nested.attr("narWriteCompressionTimeMs", stats.narWriteCompressionTimeMs); + nested.attr("narCompressionSavings", + stats.narWriteBytes + ? 1.0 - (double) stats.narWriteCompressedBytes / stats.narWriteBytes + : 0.0); + nested.attr("narCompressionSpeed", // MiB/s + stats.narWriteCompressionTimeMs + ? (double) stats.narWriteBytes / stats.narWriteCompressionTimeMs * 1000.0 / (1024.0 * 1024.0) + : 0.0); + + auto s3Store = dynamic_cast(&*store); + if (s3Store) { + nested.attr("s3"); + JSONObject nested2(out); + auto & s3Stats = s3Store->getS3Stats(); + nested2.attr("put", s3Stats.put); + nested2.attr("putBytes", s3Stats.putBytes); + nested2.attr("putTimeMs", s3Stats.putTimeMs); + nested2.attr("putSpeed", + s3Stats.putTimeMs + ? (double) s3Stats.putBytes / s3Stats.putTimeMs * 1000.0 / (1024.0 * 1024.0) + : 0.0); + nested2.attr("get", s3Stats.get); + nested2.attr("getBytes", s3Stats.getBytes); + nested2.attr("getTimeMs", s3Stats.getTimeMs); + nested2.attr("getSpeed", + s3Stats.getTimeMs + ? (double) s3Stats.getBytes / s3Stats.getTimeMs * 1000.0 / (1024.0 * 1024.0) + : 0.0); + nested2.attr("head", s3Stats.head); + nested2.attr("costDollarApprox", + (s3Stats.get + s3Stats.head) / 10000.0 * 0.004 + + s3Stats.put / 1000.0 * 0.005 + + + s3Stats.getBytes / (1024.0 * 1024.0 * 1024.0) * 0.09); + } + } } if (log) printMsg(lvlInfo, format("status: %1%") % out.str()); { + auto mc = startDbUpdate(); pqxx::work txn(conn); // FIXME: use PostgreSQL 9.5 upsert. txn.exec("delete from SystemStatus where what = 'queue-runner'"); @@ -655,6 +756,40 @@ void State::run(BuildID buildOne) if (!lock) throw Error("hydra-queue-runner is already running"); + auto storeMode = hydraConfig["store_mode"]; + + _localStore = openStore(); + + if (storeMode == "direct" || storeMode == "") { + _destStore = _localStore; + } + + else if (storeMode == "local-binary-cache") { + auto dir = hydraConfig["binary_cache_dir"]; + if (dir == "") + throw Error("you must set ‘binary_cache_dir’ in hydra.conf"); + auto store = make_ref( + _localStore, + hydraConfig["binary_cache_secret_key_file"], + hydraConfig["binary_cache_public_key_file"], + dir); + store->init(); + _destStore = std::shared_ptr(store); + } + + else if (storeMode == "s3-binary-cache") { + auto bucketName = hydraConfig["binary_cache_s3_bucket"]; + if (bucketName == "") + throw Error("you must set ‘binary_cache_s3_bucket’ in hydra.conf"); + auto store = make_ref( + _localStore, + hydraConfig["binary_cache_secret_key_file"], + hydraConfig["binary_cache_public_key_file"], + bucketName); + store->init(); + _destStore = std::shared_ptr(store); + } + { auto conn(dbPool.get()); clearBusy(*conn, 0); @@ -679,10 +814,10 @@ void State::run(BuildID buildOne) while (true) { try { auto conn(dbPool.get()); - receiver dumpStatus(*conn, "dump_status"); + receiver dumpStatus_(*conn, "dump_status"); while (true) { bool timeout = conn->await_notification(300, 0) == 0; - State::dumpStatus(*conn, timeout); + dumpStatus(*conn, timeout); } } catch (std::exception & e) { printMsg(lvlError, format("main thread: %1%") % e.what()); diff --git a/src/hydra-queue-runner/pool.hh b/src/hydra-queue-runner/pool.hh deleted file mode 100644 index a1cd3977..00000000 --- a/src/hydra-queue-runner/pool.hh +++ /dev/null @@ -1,85 +0,0 @@ -#pragma once - -#include -#include - -#include "sync.hh" - -/* This template class implements a simple pool manager of resources - of some type R, such as database connections. It is used as - follows: - - class Connection { ... }; - - Pool pool; - - { - auto conn(pool.get()); - conn->exec("select ..."); - } - - Here, the Connection object referenced by ‘conn’ is automatically - returned to the pool when ‘conn’ goes out of scope. -*/ - -template -class Pool -{ -private: - struct State - { - unsigned int count = 0; - std::list> idle; - }; - - Sync state; - -public: - - class Handle - { - private: - Pool & pool; - std::shared_ptr r; - - friend Pool; - - Handle(Pool & pool, std::shared_ptr r) : pool(pool), r(r) { } - - public: - Handle(Handle && h) : pool(h.pool), r(h.r) { h.r.reset(); } - - Handle(const Handle & l) = delete; - - ~Handle() - { - auto state_(pool.state.lock()); - if (r) state_->idle.push_back(r); - } - - R * operator -> () { return r.get(); } - R & operator * () { return *r; } - }; - - Handle get() - { - { - auto state_(state.lock()); - if (!state_->idle.empty()) { - auto p = state_->idle.back(); - state_->idle.pop_back(); - return Handle(*this, p); - } - state_->count++; - } - /* Note: we don't hold the lock while creating a new instance, - because creation might take a long time. */ - return Handle(*this, std::make_shared()); - } - - unsigned int count() - { - auto state_(state.lock()); - return state_->count; - } -}; diff --git a/src/hydra-queue-runner/queue-monitor.cc b/src/hydra-queue-runner/queue-monitor.cc index bf96f489..c9ea6da2 100644 --- a/src/hydra-queue-runner/queue-monitor.cc +++ b/src/hydra-queue-runner/queue-monitor.cc @@ -30,12 +30,13 @@ void State::queueMonitorLoop() receiver buildsBumped(*conn, "builds_bumped"); receiver jobsetSharesChanged(*conn, "jobset_shares_changed"); - auto store = openStore(); // FIXME: pool + auto localStore = getLocalStore(); + auto destStore = getDestStore(); unsigned int lastBuildId = 0; while (true) { - bool done = getQueuedBuilds(*conn, store, lastBuildId); + bool done = getQueuedBuilds(*conn, localStore, destStore, lastBuildId); /* Sleep until we get notification from the database about an event. */ @@ -63,7 +64,8 @@ void State::queueMonitorLoop() } -bool State::getQueuedBuilds(Connection & conn, ref store, unsigned int & lastBuildId) +bool State::getQueuedBuilds(Connection & conn, ref localStore, + ref destStore, unsigned int & lastBuildId) { printMsg(lvlInfo, format("checking the queue for builds > %1%...") % lastBuildId); @@ -118,10 +120,11 @@ bool State::getQueuedBuilds(Connection & conn, ref store, unsigned int & nrAdded++; newBuildsByID.erase(build->id); - if (!store->isValidPath(build->drvPath)) { + if (!localStore->isValidPath(build->drvPath)) { /* Derivation has been GC'ed prematurely. */ printMsg(lvlError, format("aborting GC'ed build %1%") % build->id); if (!build->finishedInDB) { + auto mc = startDbUpdate(); pqxx::work txn(conn); txn.parameterized ("update Builds set finished = 1, buildStatus = $2, startTime = $3, stopTime = $3, errorMsg = $4 where id = $1 and finished = 0") @@ -138,7 +141,8 @@ bool State::getQueuedBuilds(Connection & conn, ref store, unsigned int & std::set newSteps; std::set finishedDrvs; // FIXME: re-use? - Step::ptr step = createStep(store, conn, build, build->drvPath, build, 0, finishedDrvs, newSteps, newRunnable); + Step::ptr step = createStep(destStore, conn, build, build->drvPath, + build, 0, finishedDrvs, newSteps, newRunnable); /* Some of the new steps may be the top level of builds that we haven't processed yet. So do them now. This ensures that @@ -156,12 +160,15 @@ bool State::getQueuedBuilds(Connection & conn, ref store, unsigned int & all valid. So we mark this as a finished, cached build. */ if (!step) { Derivation drv = readDerivation(build->drvPath); - BuildOutput res = getBuildOutput(store, drv); + BuildOutput res = getBuildOutput(destStore, destStore->getFSAccessor(), drv); + { + auto mc = startDbUpdate(); pqxx::work txn(conn); time_t now = time(0); markSucceededBuild(txn, build, res, true, now, now); txn.commit(); + } build->finishedInDB = true; @@ -175,6 +182,7 @@ bool State::getQueuedBuilds(Connection & conn, ref store, unsigned int & if (checkCachedFailure(r, conn)) { printMsg(lvlError, format("marking build %1% as cached failure") % build->id); if (!build->finishedInDB) { + auto mc = startDbUpdate(); pqxx::work txn(conn); /* Find the previous build step record, first by @@ -314,7 +322,7 @@ void State::processQueueChange(Connection & conn) } -Step::ptr State::createStep(ref store, +Step::ptr State::createStep(ref destStore, Connection & conn, Build::ptr build, const Path & drvPath, Build::ptr referringBuild, Step::ptr referringStep, std::set & finishedDrvs, std::set & newSteps, std::set & newRunnable) @@ -394,7 +402,7 @@ Step::ptr State::createStep(ref store, DerivationOutputs missing; PathSet missingPaths; for (auto & i : step->drv.outputs) - if (!store->isValidPath(i.second.path)) { + if (!destStore->isValidPath(i.second.path)) { valid = false; missing[i.first] = i.second; missingPaths.insert(i.second.path); @@ -406,7 +414,7 @@ Step::ptr State::createStep(ref store, assert(missing.size() == missingPaths.size()); if (!missing.empty() && settings.useSubstitutes) { SubstitutablePathInfos infos; - store->querySubstitutablePathInfos(missingPaths, infos); + destStore->querySubstitutablePathInfos(missingPaths, infos); // FIXME if (infos.size() == missingPaths.size()) { valid = true; for (auto & i : missing) { @@ -414,10 +422,11 @@ Step::ptr State::createStep(ref store, printMsg(lvlInfo, format("substituting output ‘%1%’ of ‘%2%’") % i.second.path % drvPath); time_t startTime = time(0); - store->ensurePath(i.second.path); + destStore->ensurePath(i.second.path); time_t stopTime = time(0); { + auto mc = startDbUpdate(); pqxx::work txn(conn); createSubstitutionStep(txn, startTime, stopTime, build, drvPath, "out", i.second.path); txn.commit(); @@ -443,7 +452,7 @@ Step::ptr State::createStep(ref store, /* Create steps for the dependencies. */ for (auto & i : step->drv.inputDrvs) { - auto dep = createStep(store, conn, build, i.first, 0, step, finishedDrvs, newSteps, newRunnable); + auto dep = createStep(destStore, conn, build, i.first, 0, step, finishedDrvs, newSteps, newRunnable); if (dep) { auto step_(step->state.lock()); step_->deps.insert(dep); diff --git a/src/hydra-queue-runner/s3-binary-cache-store.cc b/src/hydra-queue-runner/s3-binary-cache-store.cc new file mode 100644 index 00000000..146c26fc --- /dev/null +++ b/src/hydra-queue-runner/s3-binary-cache-store.cc @@ -0,0 +1,180 @@ +#include "s3-binary-cache-store.hh" + +#include "nar-info.hh" + +#include +#include +#include +#include +#include +#include +#include + +namespace nix { + +struct S3Error : public Error +{ + Aws::S3::S3Errors err; + S3Error(Aws::S3::S3Errors err, const FormatOrString & fs) + : Error(fs), err(err) { }; +}; + +/* Helper: given an Outcome, return R in case of success, or + throw an exception in case of an error. */ +template +R && checkAws(const FormatOrString & fs, Aws::Utils::Outcome && outcome) +{ + if (!outcome.IsSuccess()) + throw S3Error( + outcome.GetError().GetErrorType(), + fs.s + ": " + outcome.GetError().GetMessage()); + return outcome.GetResultWithOwnership(); +} + +S3BinaryCacheStore::S3BinaryCacheStore(std::shared_ptr localStore, + const Path & secretKeyFile, const Path & publicKeyFile, + const std::string & bucketName) + : BinaryCacheStore(localStore, secretKeyFile, publicKeyFile) + , bucketName(bucketName) + , config(makeConfig()) + , client(make_ref(*config)) +{ +} + +ref S3BinaryCacheStore::makeConfig() +{ + auto res = make_ref(); + res->region = Aws::Region::US_EAST_1; + res->requestTimeoutMs = 600 * 1000; + return res; +} + +void S3BinaryCacheStore::init() +{ + /* Create the bucket if it doesn't already exists. */ + // FIXME: HeadBucket would be more appropriate, but doesn't return + // an easily parsed 404 message. + auto res = client->GetBucketLocation( + Aws::S3::Model::GetBucketLocationRequest().WithBucket(bucketName)); + + if (!res.IsSuccess()) { + if (res.GetError().GetErrorType() != Aws::S3::S3Errors::NO_SUCH_BUCKET) + throw Error(format("AWS error checking bucket ‘%s’: %s") % bucketName % res.GetError().GetMessage()); + + checkAws(format("AWS error creating bucket ‘%s’") % bucketName, + client->CreateBucket( + Aws::S3::Model::CreateBucketRequest() + .WithBucket(bucketName) + .WithCreateBucketConfiguration( + Aws::S3::Model::CreateBucketConfiguration() + /* .WithLocationConstraint( + Aws::S3::Model::BucketLocationConstraint::US) */ ))); + } + + BinaryCacheStore::init(); +} + +const S3BinaryCacheStore::Stats & S3BinaryCacheStore::getS3Stats() +{ + return stats; +} + +/* This is a specialisation of isValidPath() that optimistically + fetches the .narinfo file, rather than first checking for its + existence via a HEAD request. Since .narinfos are small, doing a + GET is unlikely to be slower than HEAD. */ +bool S3BinaryCacheStore::isValidPath(const Path & storePath) +{ + try { + readNarInfo(storePath); + return true; + } catch (S3Error & e) { + if (e.err == Aws::S3::S3Errors::NO_SUCH_KEY) return false; + throw; + } +} + +bool S3BinaryCacheStore::fileExists(const std::string & path) +{ + stats.head++; + + auto res = client->HeadObject( + Aws::S3::Model::HeadObjectRequest() + .WithBucket(bucketName) + .WithKey(path)); + + if (!res.IsSuccess()) { + auto & error = res.GetError(); + if (error.GetErrorType() == Aws::S3::S3Errors::UNKNOWN // FIXME + && error.GetMessage().find("404") != std::string::npos) + return false; + throw Error(format("AWS error fetching ‘%s’") % path % error.GetMessage()); + } + + return true; +} + +void S3BinaryCacheStore::upsertFile(const std::string & path, const std::string & data) +{ + auto request = + Aws::S3::Model::PutObjectRequest() + .WithBucket(bucketName) + .WithKey(path); + + auto stream = std::make_shared(data); + + request.SetBody(stream); + + stats.put++; + stats.putBytes += data.size(); + + auto now1 = std::chrono::steady_clock::now(); + + auto result = checkAws(format("AWS error uploading ‘%s’") % path, + client->PutObject(request)); + + auto now2 = std::chrono::steady_clock::now(); + + auto duration = std::chrono::duration_cast(now2 - now1).count(); + + printMsg(lvlInfo, format("uploaded ‘s3://%1%/%2%’ (%3% bytes) in %4% ms") + % bucketName % path % data.size() % duration); + + stats.putTimeMs += duration; +} + +std::string S3BinaryCacheStore::getFile(const std::string & path) +{ + auto request = + Aws::S3::Model::GetObjectRequest() + .WithBucket(bucketName) + .WithKey(path); + + request.SetResponseStreamFactory([&]() { + return Aws::New("STRINGSTREAM"); + }); + + stats.get++; + + auto now1 = std::chrono::steady_clock::now(); + + auto result = checkAws(format("AWS error fetching ‘%s’") % path, + client->GetObject(request)); + + auto now2 = std::chrono::steady_clock::now(); + + auto res = dynamic_cast(result.GetBody()).str(); + + auto duration = std::chrono::duration_cast(now2 - now1).count(); + + printMsg(lvlTalkative, format("downloaded ‘s3://%1%/%2%’ (%3% bytes) in %4% ms") + % bucketName % path % res.size() % duration); + + stats.getBytes += res.size(); + stats.getTimeMs += duration; + + return res; +} + +} + diff --git a/src/hydra-queue-runner/s3-binary-cache-store.hh b/src/hydra-queue-runner/s3-binary-cache-store.hh new file mode 100644 index 00000000..1ba78dce --- /dev/null +++ b/src/hydra-queue-runner/s3-binary-cache-store.hh @@ -0,0 +1,60 @@ +#pragma once + +#include "binary-cache-store.hh" + +#include + +namespace Aws { namespace Client { class ClientConfiguration; } } +namespace Aws { namespace S3 { class S3Client; } } + +namespace nix { + +class S3BinaryCacheStore : public BinaryCacheStore +{ +private: + + std::string bucketName; + + ref config; + ref client; + +public: + + S3BinaryCacheStore(std::shared_ptr localStore, + const Path & secretKeyFile, const Path & publicKeyFile, + const std::string & bucketName); + + void init() override; + + struct Stats + { + std::atomic put{0}; + std::atomic putBytes{0}; + std::atomic putTimeMs{0}; + std::atomic get{0}; + std::atomic getBytes{0}; + std::atomic getTimeMs{0}; + std::atomic head{0}; + }; + + const Stats & getS3Stats(); + + bool isValidPath(const Path & storePath) override; + +private: + + Stats stats; + + ref makeConfig(); + +protected: + + bool fileExists(const std::string & path) override; + + void upsertFile(const std::string & path, const std::string & data) override; + + std::string getFile(const std::string & path) override; + +}; + +} diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index 9b710808..be3ead24 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -16,6 +16,8 @@ #include "store-api.hh" #include "derivations.hh" +#include "binary-cache-store.hh" // FIXME + typedef unsigned int BuildID; @@ -51,6 +53,7 @@ struct RemoteResult : nix::BuildResult time_t startTime = 0, stopTime = 0; unsigned int overhead = 0; nix::Path logFile; + std::shared_ptr accessor; bool canRetry() { @@ -78,7 +81,7 @@ private: std::atomic shares{1}; /* The start time and duration of the most recent build steps. */ - Sync> steps; + nix::Sync> steps; public: @@ -185,7 +188,7 @@ struct Step std::atomic_bool finished{false}; // debugging - Sync state; + nix::Sync state; ~Step() { @@ -225,7 +228,7 @@ struct Machine system_time lastFailure, disabledUntil; unsigned int consecutiveFailures; }; - Sync connectInfo; + nix::Sync connectInfo; /* Mutex to prevent multiple threads from sending data to the same machine (which would be inefficient). */ @@ -260,35 +263,37 @@ private: nix::Path hydraData, logDir; + std::map hydraConfig; + /* The queued builds. */ typedef std::map Builds; - Sync builds; + nix::Sync builds; /* The jobsets. */ typedef std::map, Jobset::ptr> Jobsets; - Sync jobsets; + nix::Sync jobsets; /* All active or pending build steps (i.e. dependencies of the queued builds). Note that these are weak pointers. Steps are kept alive by being reachable from Builds or by being in progress. */ typedef std::map Steps; - Sync steps; + nix::Sync steps; /* Build steps that have no unbuilt dependencies. */ typedef std::list Runnable; - Sync runnable; + nix::Sync runnable; /* CV for waking up the dispatcher. */ - Sync dispatcherWakeup; - std::condition_variable_any dispatcherWakeupCV; + nix::Sync dispatcherWakeup; + std::condition_variable dispatcherWakeupCV; /* PostgreSQL connection pool. */ - Pool dbPool; + nix::Pool dbPool; /* The build machines. */ typedef std::map Machines; - Sync machines; // FIXME: use atomic_shared_ptr + nix::Sync machines; // FIXME: use atomic_shared_ptr /* Various stats. */ time_t startedAt; @@ -308,18 +313,19 @@ private: counter nrDispatcherWakeups{0}; counter bytesSent{0}; counter bytesReceived{0}; + counter nrActiveDbUpdates{0}; /* Log compressor work queue. */ - Sync> logCompressorQueue; - std::condition_variable_any logCompressorWakeup; + nix::Sync> logCompressorQueue; + std::condition_variable logCompressorWakeup; /* Notification sender work queue. FIXME: if hydra-queue-runner is killed before it has finished sending notifications about a build, then the notifications may be lost. It would be better to mark builds with pending notification in the database. */ typedef std::pair> NotificationItem; - Sync> notificationSenderQueue; - std::condition_variable_any notificationSenderWakeup; + nix::Sync> notificationSenderQueue; + std::condition_variable notificationSenderWakeup; /* Specific build to do for --build-one (testing only). */ BuildID buildOne; @@ -332,7 +338,7 @@ private: std::chrono::seconds waitTime; // time runnable steps have been waiting }; - Sync> machineTypes; + nix::Sync> machineTypes; struct MachineReservation { @@ -346,11 +352,23 @@ private: std::atomic lastDispatcherCheck{0}; + std::shared_ptr _localStore; + std::shared_ptr _destStore; + public: State(); private: + MaintainCount startDbUpdate(); + + /* Return a store object that can access derivations produced by + hydra-evaluator. */ + nix::ref getLocalStore(); + + /* Return a store object to store build results. */ + nix::ref getDestStore(); + void clearBusy(Connection & conn, time_t stopTime); void parseMachines(const std::string & contents); @@ -379,7 +397,8 @@ private: void queueMonitorLoop(); /* Check the queue for new builds. */ - bool getQueuedBuilds(Connection & conn, nix::ref store, unsigned int & lastBuildId); + bool getQueuedBuilds(Connection & conn, nix::ref localStore, + nix::ref destStore, unsigned int & lastBuildId); /* Handle cancellation, deletion and priority bumps. */ void processQueueChange(Connection & conn); @@ -407,10 +426,10 @@ private: /* Perform the given build step. Return true if the step is to be retried. */ - bool doBuildStep(nix::ref store, Step::ptr step, + bool doBuildStep(nix::ref destStore, Step::ptr step, Machine::ptr machine); - void buildRemote(nix::ref store, + void buildRemote(nix::ref destStore, Machine::ptr machine, Step::ptr step, unsigned int maxSilentTime, unsigned int buildTimeout, RemoteResult & result); diff --git a/src/hydra-queue-runner/sync.hh b/src/hydra-queue-runner/sync.hh deleted file mode 100644 index 1573f091..00000000 --- a/src/hydra-queue-runner/sync.hh +++ /dev/null @@ -1,74 +0,0 @@ -#pragma once - -#include -#include -#include - -/* This template class ensures synchronized access to a value of type - T. It is used as follows: - - struct Data { int x; ... }; - - Sync data; - - { - auto data_(data.lock()); - data_->x = 123; - } - - Here, "data" is automatically unlocked when "data_" goes out of - scope. -*/ - -template -class Sync -{ -private: - std::mutex mutex; - T data; - -public: - - Sync() { } - Sync(const T & data) : data(data) { } - - class Lock - { - private: - Sync * s; - friend Sync; - Lock(Sync * s) : s(s) { s->mutex.lock(); } - public: - Lock(Lock && l) : s(l.s) { l.s = 0; } - Lock(const Lock & l) = delete; - ~Lock() { if (s) s->mutex.unlock(); } - T * operator -> () { return &s->data; } - T & operator * () { return s->data; } - - /* FIXME: performance impact of condition_variable_any? */ - void wait(std::condition_variable_any & cv) - { - assert(s); - cv.wait(s->mutex); - } - - template - bool wait_for(std::condition_variable_any & cv, - const std::chrono::duration & duration, - Predicate pred) - { - assert(s); - return cv.wait_for(s->mutex, duration, pred); - } - - template - std::cv_status wait_until(std::condition_variable_any & cv, - const std::chrono::time_point & duration) - { - assert(s); - return cv.wait_until(s->mutex, duration); - } - }; - - Lock lock() { return Lock(this); } -}; diff --git a/src/hydra-queue-runner/token-server.hh b/src/hydra-queue-runner/token-server.hh index 2ff748e3..d4f5f843 100644 --- a/src/hydra-queue-runner/token-server.hh +++ b/src/hydra-queue-runner/token-server.hh @@ -14,7 +14,7 @@ class TokenServer unsigned int maxTokens; Sync curTokens{0}; - std::condition_variable_any wakeup; + std::condition_variable wakeup; public: TokenServer(unsigned int maxTokens) : maxTokens(maxTokens) { } diff --git a/src/lib/Hydra/Controller/Build.pm b/src/lib/Hydra/Controller/Build.pm index e2648b53..37d265f0 100644 --- a/src/lib/Hydra/Controller/Build.pm +++ b/src/lib/Hydra/Controller/Build.pm @@ -64,7 +64,10 @@ sub build_GET { my $build = $c->stash->{build}; $c->stash->{template} = 'build.tt'; - $c->stash->{available} = all { isValidPath($_->path) } $build->buildoutputs->all; + $c->stash->{available} = + ($c->config->{store_mode} // "direct") eq "direct" + ? all { isValidPath($_->path) } $build->buildoutputs->all + : 1; # FIXME $c->stash->{drvAvailable} = isValidPath $build->drvpath; if ($build->finished && $build->iscachedbuild) { @@ -219,7 +222,33 @@ sub download : Chained('buildChain') PathPart { } notFound($c, "Build doesn't have a product $productRef.") if !defined $product; - notFound($c, "Build product " . $product->path . " has disappeared.") unless -e $product->path; + if ($product->path !~ /^($Nix::Config::storeDir\/[^\/]+)/) { + die "Invalid store path " . $product->path . ".\n"; + } + my $storePath = $1; + + # Hack to get downloads to work on binary cache stores: if the + # store path is not available locally, then import it into the + # local store. FIXME: find a better way; this can require an + # unbounded amount of space. + if (!isValidPath($storePath)) { + my $storeMode = $c->config->{store_mode} // "direct"; + notFound($c, "File " . $product->path . " has disappeared.") + if $storeMode eq "direct"; + my $url = + $storeMode eq "local-binary-cache" ? "file://" . $c->config->{binary_cache_dir} : + $storeMode eq "s3-binary-cache" ? "https://" . $c->config->{binary_cache_s3_bucket} . ".s3.amazonaws.com/" : + die; + my $args = ""; + if (defined $c->config->{binary_cache_public_key_file} + && -r $c->config->{binary_cache_public_key_file}) + { + $args = "--option binary-cache-public-keys " . read_file($c->config->{binary_cache_public_key_file}); + } + system("nix-store --realise '$storePath' --option extra-binary-caches '$url' $args>/dev/null"); + } + + notFound($c, "File " . $product->path . " does not exist.") unless -e $product->path; return $c->res->redirect(defaultUriForProduct($self, $c, $product, @path)) if scalar @path == 0 && ($product->name || $product->defaultpath); diff --git a/src/lib/Hydra/Controller/Root.pm b/src/lib/Hydra/Controller/Root.pm index dab82b62..8827d4f0 100644 --- a/src/lib/Hydra/Controller/Root.pm +++ b/src/lib/Hydra/Controller/Root.pm @@ -118,6 +118,22 @@ sub status_GET { } +sub queue_runner_status :Local :Path('queue-runner-status') :Args(0) :ActionClass('REST') { } + +sub queue_runner_status_GET { + my ($self, $c) = @_; + + #my $status = from_json($c->model('DB::SystemStatus')->find('queue-runner')->status); + my $status = from_json(`hydra-queue-runner --status`); + if ($?) { $status->{status} = "unknown"; } + my $json = JSON->new->pretty()->canonical(); + + $c->stash->{template} = 'queue-runner-status.tt'; + $c->stash->{status} = $json->encode($status); + $self->status_ok($c, entity => $status); +} + + sub machines :Local Args(0) { my ($self, $c) = @_; my $machines = getMachines; @@ -241,48 +257,90 @@ sub serialize : ActionClass('Serialize') { } sub nar :Local :Args(1) { my ($self, $c, $path) = @_; - $path = ($ENV{NIX_STORE_DIR} || "/nix/store")."/$path"; + die if $path =~ /\//; - gone($c, "Path " . $path . " is no longer available.") unless isValidPath($path); + my $storeMode = $c->config->{store_mode} // "direct"; - $c->stash->{current_view} = 'NixNAR'; - $c->stash->{storePath} = $path; + if ($storeMode eq "s3-binary-cache") { + notFound($c, "There is no binary cache here."); + } + + elsif ($storeMode eq "local-binary-cache") { + my $dir = $c->config->{binary_cache_dir}; + $c->serve_static_file($dir . "/nar/" . $path); + } + + else { + $path = $Nix::Config::storeDir . "/$path"; + + gone($c, "Path " . $path . " is no longer available.") unless isValidPath($path); + + $c->stash->{current_view} = 'NixNAR'; + $c->stash->{storePath} = $path; + } } sub nix_cache_info :Path('nix-cache-info') :Args(0) { my ($self, $c) = @_; - $c->response->content_type('text/plain'); - $c->stash->{plain}->{data} = - #"StoreDir: $Nix::Config::storeDir\n" . # FIXME - "StoreDir: " . ($ENV{NIX_STORE_DIR} || "/nix/store") . "\n" . - "WantMassQuery: 0\n" . - # Give Hydra binary caches a very low priority (lower than the - # static binary cache http://nixos.org/binary-cache). - "Priority: 100\n"; - setCacheHeaders($c, 24 * 60 * 60); - $c->forward('Hydra::View::Plain'); + + my $storeMode = $c->config->{store_mode} // "direct"; + + if ($storeMode eq "s3-binary-cache") { + notFound($c, "There is no binary cache here."); + } + + elsif ($storeMode eq "local-binary-cache") { + my $dir = $c->config->{binary_cache_dir}; + $c->serve_static_file($dir . "/nix-cache-info"); + } + + else { + $c->response->content_type('text/plain'); + $c->stash->{plain}->{data} = + "StoreDir: $Nix::Config::storeDir\n" . + "WantMassQuery: 0\n" . + # Give Hydra binary caches a very low priority (lower than the + # static binary cache http://nixos.org/binary-cache). + "Priority: 100\n"; + setCacheHeaders($c, 24 * 60 * 60); + $c->forward('Hydra::View::Plain'); + } } sub narinfo :LocalRegex('^([a-z0-9]+).narinfo$') :Args(0) { my ($self, $c) = @_; - my $hash = $c->req->captures->[0]; - die if length($hash) != 32; - my $path = queryPathFromHashPart($hash); + my $storeMode = $c->config->{store_mode} // "direct"; - if (!$path) { - $c->response->status(404); - $c->response->content_type('text/plain'); - $c->stash->{plain}->{data} = "does not exist\n"; - $c->forward('Hydra::View::Plain'); - setCacheHeaders($c, 60 * 60); - return; + if ($storeMode eq "s3-binary-cache") { + notFound($c, "There is no binary cache here."); } - $c->stash->{storePath} = $path; - $c->forward('Hydra::View::NARInfo'); + elsif ($storeMode eq "local-binary-cache") { + my $dir = $c->config->{binary_cache_dir}; + $c->serve_static_file($dir . "/" . $c->req->captures->[0] . ".narinfo"); + } + + else { + my $hash = $c->req->captures->[0]; + + die if length($hash) != 32; + my $path = queryPathFromHashPart($hash); + + if (!$path) { + $c->response->status(404); + $c->response->content_type('text/plain'); + $c->stash->{plain}->{data} = "does not exist\n"; + $c->forward('Hydra::View::Plain'); + setCacheHeaders($c, 60 * 60); + return; + } + + $c->stash->{storePath} = $path; + $c->forward('Hydra::View::NARInfo'); + } } diff --git a/src/lib/Hydra/View/NARInfo.pm b/src/lib/Hydra/View/NARInfo.pm index d04b9e09..4afe4041 100644 --- a/src/lib/Hydra/View/NARInfo.pm +++ b/src/lib/Hydra/View/NARInfo.pm @@ -36,13 +36,10 @@ sub process { # Optionally, sign the NAR info file we just created. my $secretKeyFile = $c->config->{binary_cache_secret_key_file}; if (defined $secretKeyFile) { - my $s = readFile $secretKeyFile; - chomp $s; - my ($keyName, $secretKey) = split ":", $s; - die "invalid secret key file\n" unless defined $keyName && defined $secretKey; + my $secretKey = readFile $secretKeyFile; my $fingerprint = fingerprintPath($storePath, $narHash, $narSize, $refs); - my $sig = encode_base64(signString(decode_base64($secretKey), $fingerprint), ""); - $info .= "Sig: $keyName:$sig\n"; + my $sig = signString($secretKey, $fingerprint); + $info .= "Sig: $sig\n"; } setCacheHeaders($c, 24 * 60 * 60); diff --git a/src/root/queue-runner-status.tt b/src/root/queue-runner-status.tt new file mode 100644 index 00000000..d2896973 --- /dev/null +++ b/src/root/queue-runner-status.tt @@ -0,0 +1,8 @@ +[% WRAPPER layout.tt title="Queue runner status" %] +[% PROCESS common.tt %] + +
+[% HTML.escape(status) %]
+
+ +[% END %] diff --git a/tests/jobs/succeed-with-failed.sh b/tests/jobs/succeed-with-failed.sh index 51f79931..bd1214b9 100755 --- a/tests/jobs/succeed-with-failed.sh +++ b/tests/jobs/succeed-with-failed.sh @@ -1,3 +1,3 @@ #! /bin/sh -mkdir -p $out/nix-support/failed - +mkdir -p $out/nix-support +touch $out/nix-support/failed