Merge branch 'binary-cache'

This commit is contained in:
Eelco Dolstra 2016-02-29 18:29:07 +01:00
commit bc958c508b
22 changed files with 673 additions and 313 deletions

View file

@ -291,11 +291,14 @@ in
IN_SYSTEMD = "1"; # to get log severity levels IN_SYSTEMD = "1"; # to get log severity levels
}; };
serviceConfig = serviceConfig =
{ ExecStartPre = "${cfg.package}/bin/hydra-queue-runner --unlock"; { ExecStart = "@${cfg.package}/bin/hydra-queue-runner hydra-queue-runner -v --option build-use-substitutes false";
ExecStart = "@${cfg.package}/bin/hydra-queue-runner hydra-queue-runner -v";
ExecStopPost = "${cfg.package}/bin/hydra-queue-runner --unlock"; ExecStopPost = "${cfg.package}/bin/hydra-queue-runner --unlock";
User = "hydra-queue-runner"; User = "hydra-queue-runner";
Restart = "always"; Restart = "always";
# Ensure we can get core dumps.
LimitCORE = "infinity";
WorkingDirectory = "${baseDir}/queue-runner";
}; };
}; };

View file

@ -159,6 +159,10 @@ rec {
guile # optional, for Guile + Guix support guile # optional, for Guile + Guix support
perlDeps perl perlDeps perl
postgresql92 # for running the tests postgresql92 # for running the tests
(aws-sdk-cpp.override {
apis = ["s3"];
customMemoryManagement = false;
})
]; ];
hydraPath = lib.makeSearchPath "bin" ( hydraPath = lib.makeSearchPath "bin" (

View file

@ -2,7 +2,9 @@ bin_PROGRAMS = hydra-queue-runner
hydra_queue_runner_SOURCES = hydra-queue-runner.cc queue-monitor.cc dispatcher.cc \ hydra_queue_runner_SOURCES = hydra-queue-runner.cc queue-monitor.cc dispatcher.cc \
builder.cc build-result.cc build-remote.cc \ builder.cc build-result.cc build-remote.cc \
build-result.hh counter.hh pool.hh sync.hh token-server.hh state.hh db.hh build-result.hh counter.hh token-server.hh state.hh db.hh \
s3-binary-cache-store.hh s3-binary-cache-store.cc \
finally.hh
hydra_queue_runner_LDADD = $(NIX_LIBS) -lpqxx hydra_queue_runner_LDADD = $(NIX_LIBS) -lpqxx
AM_CXXFLAGS = $(NIX_CFLAGS) -Wall AM_CXXFLAGS = $(NIX_CFLAGS) -Wall -laws-cpp-sdk-s3

View file

@ -8,6 +8,7 @@
#include "state.hh" #include "state.hh"
#include "util.hh" #include "util.hh"
#include "worker-protocol.hh" #include "worker-protocol.hh"
#include "finally.hh"
using namespace nix; using namespace nix;
@ -73,19 +74,19 @@ static void openConnection(Machine::ptr machine, Path tmpDir, int stderrFD, Chil
} }
static void copyClosureTo(ref<Store> store, static void copyClosureTo(ref<Store> destStore,
FdSource & from, FdSink & to, const PathSet & paths, FdSource & from, FdSink & to, const PathSet & paths,
counter & bytesSent,
bool useSubstitutes = false) bool useSubstitutes = false)
{ {
PathSet closure; PathSet closure;
for (auto & path : paths) for (auto & path : paths)
store->computeFSClosure(path, closure); destStore->computeFSClosure(path, closure);
/* Send the "query valid paths" command with the "lock" option /* Send the "query valid paths" command with the "lock" option
enabled. This prevents a race where the remote host enabled. This prevents a race where the remote host
garbage-collect paths that are already there. Optionally, ask garbage-collect paths that are already there. Optionally, ask
the remote host to substitute missing paths. */ the remote host to substitute missing paths. */
// FIXME: substitute output pollutes our build log
to << cmdQueryValidPaths << 1 << useSubstitutes << closure; to << cmdQueryValidPaths << 1 << useSubstitutes << closure;
to.flush(); to.flush();
@ -95,7 +96,7 @@ static void copyClosureTo(ref<Store> store,
if (present.size() == closure.size()) return; if (present.size() == closure.size()) return;
Paths sorted = store->topoSortPaths(closure); Paths sorted = destStore->topoSortPaths(closure);
Paths missing; Paths missing;
for (auto i = sorted.rbegin(); i != sorted.rend(); ++i) for (auto i = sorted.rbegin(); i != sorted.rend(); ++i)
@ -103,11 +104,8 @@ static void copyClosureTo(ref<Store> store,
printMsg(lvlDebug, format("sending %1% missing paths") % missing.size()); printMsg(lvlDebug, format("sending %1% missing paths") % missing.size());
for (auto & p : missing)
bytesSent += store->queryPathInfo(p).narSize;
to << cmdImportPaths; to << cmdImportPaths;
store->exportPaths(missing, false, to); destStore->exportPaths(missing, false, to);
to.flush(); to.flush();
if (readInt(from) != 1) if (readInt(from) != 1)
@ -115,19 +113,7 @@ static void copyClosureTo(ref<Store> store,
} }
static void copyClosureFrom(ref<Store> store, void State::buildRemote(ref<Store> destStore,
FdSource & from, FdSink & to, const PathSet & paths, counter & bytesReceived)
{
to << cmdExportPaths << 0 << paths;
to.flush();
store->importPaths(false, from);
for (auto & p : paths)
bytesReceived += store->queryPathInfo(p).narSize;
}
void State::buildRemote(ref<Store> store,
Machine::ptr machine, Step::ptr step, Machine::ptr machine, Step::ptr step,
unsigned int maxSilentTime, unsigned int buildTimeout, unsigned int maxSilentTime, unsigned int buildTimeout,
RemoteResult & result) RemoteResult & result)
@ -152,6 +138,11 @@ void State::buildRemote(ref<Store> store,
FdSource from(child.from); FdSource from(child.from);
FdSink to(child.to); FdSink to(child.to);
Finally updateStats([&]() {
bytesReceived += from.read;
bytesSent += to.written;
});
/* Handshake. */ /* Handshake. */
bool sendDerivation = true; bool sendDerivation = true;
unsigned int remoteVersion; unsigned int remoteVersion;
@ -222,8 +213,14 @@ void State::buildRemote(ref<Store> store,
} }
} }
/* Ensure that the inputs exist in the destination store. This is
a no-op for regular stores, but for the binary cache store,
this will copy the inputs to the binary cache from the local
store. */
destStore->buildPaths(basicDrv.inputSrcs);
/* Copy the input closure. */ /* Copy the input closure. */
if (machine->sshName != "localhost") { if (/* machine->sshName != "localhost" */ true) {
auto mc1 = std::make_shared<MaintainCount>(nrStepsWaiting); auto mc1 = std::make_shared<MaintainCount>(nrStepsWaiting);
std::lock_guard<std::mutex> sendLock(machine->state->sendLock); std::lock_guard<std::mutex> sendLock(machine->state->sendLock);
mc1.reset(); mc1.reset();
@ -232,7 +229,7 @@ void State::buildRemote(ref<Store> store,
auto now1 = std::chrono::steady_clock::now(); auto now1 = std::chrono::steady_clock::now();
copyClosureTo(store, from, to, inputs, bytesSent); copyClosureTo(destStore, from, to, inputs, true);
auto now2 = std::chrono::steady_clock::now(); auto now2 = std::chrono::steady_clock::now();
@ -279,21 +276,26 @@ void State::buildRemote(ref<Store> store,
/* If the path was substituted or already valid, then we didn't /* If the path was substituted or already valid, then we didn't
get a build log. */ get a build log. */
if (result.status == BuildResult::Substituted || result.status == BuildResult::AlreadyValid) { if (result.status == BuildResult::Substituted || result.status == BuildResult::AlreadyValid) {
printMsg(lvlInfo, format("outputs of %1% substituted or already valid on %2%") % step->drvPath % machine->sshName);
unlink(result.logFile.c_str()); unlink(result.logFile.c_str());
result.logFile = ""; result.logFile = "";
} }
/* Copy the output paths. */ /* Copy the output paths. */
if (machine->sshName != "localhost") { if (/* machine->sshName != "localhost" */ true) {
printMsg(lvlDebug, format("copying outputs of %1% from %2%") % step->drvPath % machine->sshName); printMsg(lvlDebug, format("copying outputs of %1% from %2%") % step->drvPath % machine->sshName);
PathSet outputs; PathSet outputs;
for (auto & output : step->drv.outputs) for (auto & output : step->drv.outputs)
outputs.insert(output.second.path); outputs.insert(output.second.path);
MaintainCount mc(nrStepsCopyingFrom); MaintainCount mc(nrStepsCopyingFrom);
result.accessor = destStore->getFSAccessor();
auto now1 = std::chrono::steady_clock::now(); auto now1 = std::chrono::steady_clock::now();
copyClosureFrom(store, from, to, outputs, bytesReceived); to << cmdExportPaths << 0 << outputs;
to.flush();
destStore->importPaths(false, from, result.accessor);
auto now2 = std::chrono::steady_clock::now(); auto now2 = std::chrono::steady_clock::now();

View file

@ -2,26 +2,13 @@
#include "store-api.hh" #include "store-api.hh"
#include "util.hh" #include "util.hh"
#include "regex.hh" #include "regex.hh"
#include "fs-accessor.hh"
using namespace nix; using namespace nix;
static std::tuple<bool, string> secureRead(Path fileName) BuildOutput getBuildOutput(nix::ref<Store> store,
{ nix::ref<nix::FSAccessor> accessor, const Derivation & drv)
auto fail = std::make_tuple(false, "");
if (!pathExists(fileName)) return fail;
try {
/* For security, resolve symlinks. */
fileName = canonPath(fileName, true);
if (!isInStore(fileName)) return fail;
return std::make_tuple(true, readFile(fileName));
} catch (Error & e) { return fail; }
}
BuildOutput getBuildOutput(nix::ref<Store> store, const Derivation & drv)
{ {
BuildOutput res; BuildOutput res;
@ -52,14 +39,16 @@ BuildOutput getBuildOutput(nix::ref<Store> store, const Derivation & drv)
for (auto & output : outputs) { for (auto & output : outputs) {
Path failedFile = output + "/nix-support/failed"; Path failedFile = output + "/nix-support/failed";
if (pathExists(failedFile)) res.failed = true; if (accessor->stat(failedFile).type == FSAccessor::Type::tRegular)
res.failed = true;
auto file = secureRead(output + "/nix-support/hydra-build-products"); Path productsFile = output + "/nix-support/hydra-build-products";
if (!std::get<0>(file)) continue; if (accessor->stat(productsFile).type != FSAccessor::Type::tRegular)
continue;
explicitProducts = true; explicitProducts = true;
for (auto & line : tokenizeString<Strings>(std::get<1>(file), "\n")) { for (auto & line : tokenizeString<Strings>(accessor->readFile(productsFile), "\n")) {
BuildProduct product; BuildProduct product;
Regex::Subs subs; Regex::Subs subs;
@ -72,26 +61,23 @@ BuildOutput getBuildOutput(nix::ref<Store> store, const Derivation & drv)
/* Ensure that the path exists and points into the Nix /* Ensure that the path exists and points into the Nix
store. */ store. */
// FIXME: should we disallow products referring to other
// store paths, or that are outside the input closure?
if (product.path == "" || product.path[0] != '/') continue; if (product.path == "" || product.path[0] != '/') continue;
try { product.path = canonPath(product.path);
product.path = canonPath(product.path, true); if (!isInStore(product.path)) continue;
} catch (Error & e) { continue; }
if (!isInStore(product.path) || !pathExists(product.path)) continue;
/* FIXME: check that the path is in the input closure auto st = accessor->stat(product.path);
of the build? */ if (st.type == FSAccessor::Type::tMissing) continue;
product.name = product.path == output ? "" : baseNameOf(product.path); product.name = product.path == output ? "" : baseNameOf(product.path);
struct stat st; if (st.type == FSAccessor::Type::tRegular) {
if (stat(product.path.c_str(), &st))
throw SysError(format("getting status of %1%") % product.path);
if (S_ISREG(st.st_mode)) {
product.isRegular = true; product.isRegular = true;
product.fileSize = st.st_size; product.fileSize = st.fileSize;
product.sha1hash = hashFile(htSHA1, product.path); auto contents = accessor->readFile(product.path);
product.sha256hash = hashFile(htSHA256, product.path); product.sha1hash = hashString(htSHA1, contents);
product.sha256hash = hashString(htSHA256, contents);
} }
res.products.push_back(product); res.products.push_back(product);
@ -108,10 +94,10 @@ BuildOutput getBuildOutput(nix::ref<Store> store, const Derivation & drv)
product.subtype = output.first == "out" ? "" : output.first; product.subtype = output.first == "out" ? "" : output.first;
product.name = storePathToName(product.path); product.name = storePathToName(product.path);
struct stat st; auto st = accessor->stat(product.path);
if (stat(product.path.c_str(), &st)) if (st.type == FSAccessor::Type::tMissing)
throw SysError(format("getting status of %1%") % product.path); throw Error(format("getting status of %1%") % product.path);
if (S_ISDIR(st.st_mode)) if (st.type == FSAccessor::Type::tDirectory)
res.products.push_back(product); res.products.push_back(product);
} }
} }
@ -119,17 +105,18 @@ BuildOutput getBuildOutput(nix::ref<Store> store, const Derivation & drv)
/* Get the release name from $output/nix-support/hydra-release-name. */ /* Get the release name from $output/nix-support/hydra-release-name. */
for (auto & output : outputs) { for (auto & output : outputs) {
Path p = output + "/nix-support/hydra-release-name"; Path p = output + "/nix-support/hydra-release-name";
if (!pathExists(p)) continue; if (accessor->stat(p).type != FSAccessor::Type::tRegular) continue;
try { try {
res.releaseName = trim(readFile(p)); res.releaseName = trim(accessor->readFile(p));
} catch (Error & e) { continue; } } catch (Error & e) { continue; }
// FIXME: validate release name // FIXME: validate release name
} }
/* Get metrics. */ /* Get metrics. */
for (auto & output : outputs) { for (auto & output : outputs) {
auto file = secureRead(output + "/nix-support/hydra-metrics"); Path metricsFile = output + "/nix-support/hydra-metrics";
for (auto & line : tokenizeString<Strings>(std::get<1>(file), "\n")) { if (accessor->stat(metricsFile).type != FSAccessor::Type::tRegular) continue;
for (auto & line : tokenizeString<Strings>(accessor->readFile(metricsFile), "\n")) {
auto fields = tokenizeString<std::vector<std::string>>(line); auto fields = tokenizeString<std::vector<std::string>>(line);
if (fields.size() < 2) continue; if (fields.size() < 2) continue;
BuildMetric metric; BuildMetric metric;

View file

@ -4,6 +4,7 @@
#include "hash.hh" #include "hash.hh"
#include "derivations.hh" #include "derivations.hh"
#include "store-api.hh"
struct BuildProduct struct BuildProduct
{ {
@ -37,4 +38,5 @@ struct BuildOutput
std::map<std::string, BuildMetric> metrics; std::map<std::string, BuildMetric> metrics;
}; };
BuildOutput getBuildOutput(nix::ref<nix::Store> store, const nix::Derivation & drv); BuildOutput getBuildOutput(nix::ref<nix::Store> store,
nix::ref<nix::FSAccessor> accessor, const nix::Derivation & drv);

View file

@ -15,8 +15,8 @@ void State::builder(MachineReservation::ptr reservation)
auto step = reservation->step; auto step = reservation->step;
try { try {
auto store = openStore(); // FIXME: pool auto destStore = getDestStore();
retry = doBuildStep(store, step, reservation->machine); retry = doBuildStep(destStore, step, reservation->machine);
} catch (std::exception & e) { } catch (std::exception & e) {
printMsg(lvlError, format("uncaught exception building %1% on %2%: %3%") printMsg(lvlError, format("uncaught exception building %1% on %2%: %3%")
% step->drvPath % reservation->machine->sshName % e.what()); % step->drvPath % reservation->machine->sshName % e.what());
@ -45,7 +45,7 @@ void State::builder(MachineReservation::ptr reservation)
} }
bool State::doBuildStep(nix::ref<Store> store, Step::ptr step, bool State::doBuildStep(nix::ref<Store> destStore, Step::ptr step,
Machine::ptr machine) Machine::ptr machine)
{ {
{ {
@ -112,6 +112,7 @@ bool State::doBuildStep(nix::ref<Store> store, Step::ptr step,
/* Create a build step record indicating that we started /* Create a build step record indicating that we started
building. */ building. */
{ {
auto mc = startDbUpdate();
pqxx::work txn(*conn); pqxx::work txn(*conn);
stepNr = createBuildStep(txn, result.startTime, build, step, machine->sshName, bssBusy); stepNr = createBuildStep(txn, result.startTime, build, step, machine->sshName, bssBusy);
txn.commit(); txn.commit();
@ -120,13 +121,14 @@ bool State::doBuildStep(nix::ref<Store> store, Step::ptr step,
/* Do the build. */ /* Do the build. */
try { try {
/* FIXME: referring builds may have conflicting timeouts. */ /* FIXME: referring builds may have conflicting timeouts. */
buildRemote(store, machine, step, build->maxSilentTime, build->buildTimeout, result); buildRemote(destStore, machine, step, build->maxSilentTime, build->buildTimeout, result);
} catch (Error & e) { } catch (Error & e) {
result.status = BuildResult::MiscFailure; result.status = BuildResult::MiscFailure;
result.errorMsg = e.msg(); result.errorMsg = e.msg();
} }
if (result.success()) res = getBuildOutput(store, step->drv); if (result.success())
res = getBuildOutput(destStore, ref<FSAccessor>(result.accessor), step->drv);
} }
time_t stepStopTime = time(0); time_t stepStopTime = time(0);
@ -164,6 +166,7 @@ bool State::doBuildStep(nix::ref<Store> store, Step::ptr step,
retry = step_->tries + 1 < maxTries; retry = step_->tries + 1 < maxTries;
} }
if (retry) { if (retry) {
auto mc = startDbUpdate();
pqxx::work txn(*conn); pqxx::work txn(*conn);
finishBuildStep(txn, result.startTime, result.stopTime, result.overhead, build->id, finishBuildStep(txn, result.startTime, result.stopTime, result.overhead, build->id,
stepNr, machine->sshName, bssAborted, result.errorMsg); stepNr, machine->sshName, bssAborted, result.errorMsg);
@ -212,6 +215,8 @@ bool State::doBuildStep(nix::ref<Store> store, Step::ptr step,
/* Update the database. */ /* Update the database. */
{ {
auto mc = startDbUpdate();
pqxx::work txn(*conn); pqxx::work txn(*conn);
finishBuildStep(txn, result.startTime, result.stopTime, result.overhead, finishBuildStep(txn, result.startTime, result.stopTime, result.overhead,
@ -298,6 +303,8 @@ bool State::doBuildStep(nix::ref<Store> store, Step::ptr step,
/* Update the database. */ /* Update the database. */
{ {
auto mc = startDbUpdate();
pqxx::work txn(*conn); pqxx::work txn(*conn);
BuildStatus buildStatus = BuildStatus buildStatus =

View file

@ -1,6 +1,7 @@
#pragma once #pragma once
#include <atomic> #include <atomic>
#include <functional>
typedef std::atomic<unsigned long> counter; typedef std::atomic<unsigned long> counter;
@ -8,5 +9,9 @@ struct MaintainCount
{ {
counter & c; counter & c;
MaintainCount(counter & c) : c(c) { c++; } MaintainCount(counter & c) : c(c) { c++; }
MaintainCount(counter & c, std::function<void(unsigned long)> warn) : c(c)
{
warn(++c);
}
~MaintainCount() { auto prev = c--; assert(prev); } ~MaintainCount() { auto prev = c--; assert(prev); }
}; };

View file

@ -0,0 +1,12 @@
#pragma once
/* A trivial class to run a function at the end of a scope. */
class Finally
{
private:
std::function<void()> fun;
public:
Finally(std::function<void()> fun) : fun(fun) { }
~Finally() { fun(); }
};

View file

@ -7,6 +7,8 @@
#include "state.hh" #include "state.hh"
#include "build-result.hh" #include "build-result.hh"
#include "local-binary-cache-store.hh"
#include "s3-binary-cache-store.hh"
#include "shared.hh" #include "shared.hh"
#include "globals.hh" #include "globals.hh"
@ -20,10 +22,51 @@ State::State()
hydraData = getEnv("HYDRA_DATA"); hydraData = getEnv("HYDRA_DATA");
if (hydraData == "") throw Error("$HYDRA_DATA must be set"); if (hydraData == "") throw Error("$HYDRA_DATA must be set");
/* Read hydra.conf. */
auto hydraConfigFile = getEnv("HYDRA_CONFIG");
if (pathExists(hydraConfigFile)) {
for (auto line : tokenizeString<Strings>(readFile(hydraConfigFile), "\n")) {
line = trim(string(line, 0, line.find('#')));
auto eq = line.find('=');
if (eq == std::string::npos) continue;
auto key = trim(std::string(line, 0, eq));
auto value = trim(std::string(line, eq + 1));
if (key == "") continue;
hydraConfig[key] = value;
}
}
logDir = canonPath(hydraData + "/build-logs"); logDir = canonPath(hydraData + "/build-logs");
} }
MaintainCount State::startDbUpdate()
{
return MaintainCount(nrActiveDbUpdates, [](unsigned long c) {
if (c > 6) {
printMsg(lvlError, format("warning: %d concurrent database updates; PostgreSQL may be stalled") % c);
}
});
}
ref<Store> State::getLocalStore()
{
return ref<Store>(_localStore);
}
ref<Store> State::getDestStore()
{
return ref<Store>(_destStore);
}
void State::parseMachines(const std::string & contents) void State::parseMachines(const std::string & contents)
{ {
Machines newMachines, oldMachines; Machines newMachines, oldMachines;
@ -94,7 +137,8 @@ void State::monitorMachinesFile()
getEnv("NIX_REMOTE_SYSTEMS", pathExists(defaultMachinesFile) ? defaultMachinesFile : ""), ":"); getEnv("NIX_REMOTE_SYSTEMS", pathExists(defaultMachinesFile) ? defaultMachinesFile : ""), ":");
if (machinesFiles.empty()) { if (machinesFiles.empty()) {
parseMachines("localhost " + settings.thisSystem parseMachines("localhost " +
(settings.thisSystem == "x86_64-linux" ? "x86_64-linux,i686-linux" : settings.thisSystem)
+ " - " + std::to_string(settings.maxBuildJobs) + " 1"); + " - " + std::to_string(settings.maxBuildJobs) + " 1");
return; return;
} }
@ -502,8 +546,8 @@ void State::dumpStatus(Connection & conn, bool log)
root.attr("nrStepsCopyingTo", nrStepsCopyingTo); root.attr("nrStepsCopyingTo", nrStepsCopyingTo);
root.attr("nrStepsCopyingFrom", nrStepsCopyingFrom); root.attr("nrStepsCopyingFrom", nrStepsCopyingFrom);
root.attr("nrStepsWaiting", nrStepsWaiting); root.attr("nrStepsWaiting", nrStepsWaiting);
root.attr("bytesSent"); out << bytesSent; root.attr("bytesSent", bytesSent);
root.attr("bytesReceived"); out << bytesReceived; root.attr("bytesReceived", bytesReceived);
root.attr("nrBuildsRead", nrBuildsRead); root.attr("nrBuildsRead", nrBuildsRead);
root.attr("nrBuildsDone", nrBuildsDone); root.attr("nrBuildsDone", nrBuildsDone);
root.attr("nrStepsDone", nrStepsDone); root.attr("nrStepsDone", nrStepsDone);
@ -512,12 +556,13 @@ void State::dumpStatus(Connection & conn, bool log)
if (nrStepsDone) { if (nrStepsDone) {
root.attr("totalStepTime", totalStepTime); root.attr("totalStepTime", totalStepTime);
root.attr("totalStepBuildTime", totalStepBuildTime); root.attr("totalStepBuildTime", totalStepBuildTime);
root.attr("avgStepTime"); out << (float) totalStepTime / nrStepsDone; root.attr("avgStepTime", (float) totalStepTime / nrStepsDone);
root.attr("avgStepBuildTime"); out << (float) totalStepBuildTime / nrStepsDone; root.attr("avgStepBuildTime", (float) totalStepBuildTime / nrStepsDone);
} }
root.attr("nrQueueWakeups", nrQueueWakeups); root.attr("nrQueueWakeups", nrQueueWakeups);
root.attr("nrDispatcherWakeups", nrDispatcherWakeups); root.attr("nrDispatcherWakeups", nrDispatcherWakeups);
root.attr("nrDbConnections", dbPool.count()); root.attr("nrDbConnections", dbPool.count());
root.attr("nrActiveDbUpdates", nrActiveDbUpdates);
{ {
root.attr("machines"); root.attr("machines");
JSONObject nested(out); JSONObject nested(out);
@ -535,8 +580,8 @@ void State::dumpStatus(Connection & conn, bool log)
if (m->state->nrStepsDone) { if (m->state->nrStepsDone) {
nested2.attr("totalStepTime", s->totalStepTime); nested2.attr("totalStepTime", s->totalStepTime);
nested2.attr("totalStepBuildTime", s->totalStepBuildTime); nested2.attr("totalStepBuildTime", s->totalStepBuildTime);
nested2.attr("avgStepTime"); out << (float) s->totalStepTime / s->nrStepsDone; nested2.attr("avgStepTime", (float) s->totalStepTime / s->nrStepsDone);
nested2.attr("avgStepBuildTime"); out << (float) s->totalStepBuildTime / s->nrStepsDone; nested2.attr("avgStepBuildTime", (float) s->totalStepBuildTime / s->nrStepsDone);
} }
} }
} }
@ -547,7 +592,7 @@ void State::dumpStatus(Connection & conn, bool log)
for (auto & jobset : *jobsets_) { for (auto & jobset : *jobsets_) {
nested.attr(jobset.first.first + ":" + jobset.first.second); nested.attr(jobset.first.first + ":" + jobset.first.second);
JSONObject nested2(out); JSONObject nested2(out);
nested2.attr("shareUsed"); out << jobset.second->shareUsed(); nested2.attr("shareUsed", jobset.second->shareUsed());
nested2.attr("seconds", jobset.second->getSeconds()); nested2.attr("seconds", jobset.second->getSeconds());
} }
} }
@ -567,11 +612,67 @@ void State::dumpStatus(Connection & conn, bool log)
nested2.attr("lastActive", std::chrono::system_clock::to_time_t(i.second.lastActive)); nested2.attr("lastActive", std::chrono::system_clock::to_time_t(i.second.lastActive));
} }
} }
auto store = dynamic_cast<S3BinaryCacheStore *>(&*getDestStore());
if (store) {
root.attr("store");
JSONObject nested(out);
auto & stats = store->getStats();
nested.attr("narInfoRead", stats.narInfoRead);
nested.attr("narInfoReadAverted", stats.narInfoReadAverted);
nested.attr("narInfoWrite", stats.narInfoWrite);
nested.attr("narInfoCacheSize", stats.narInfoCacheSize);
nested.attr("narRead", stats.narRead);
nested.attr("narReadBytes", stats.narReadBytes);
nested.attr("narReadCompressedBytes", stats.narReadCompressedBytes);
nested.attr("narWrite", stats.narWrite);
nested.attr("narWriteAverted", stats.narWriteAverted);
nested.attr("narWriteBytes", stats.narWriteBytes);
nested.attr("narWriteCompressedBytes", stats.narWriteCompressedBytes);
nested.attr("narWriteCompressionTimeMs", stats.narWriteCompressionTimeMs);
nested.attr("narCompressionSavings",
stats.narWriteBytes
? 1.0 - (double) stats.narWriteCompressedBytes / stats.narWriteBytes
: 0.0);
nested.attr("narCompressionSpeed", // MiB/s
stats.narWriteCompressionTimeMs
? (double) stats.narWriteBytes / stats.narWriteCompressionTimeMs * 1000.0 / (1024.0 * 1024.0)
: 0.0);
auto s3Store = dynamic_cast<S3BinaryCacheStore *>(&*store);
if (s3Store) {
nested.attr("s3");
JSONObject nested2(out);
auto & s3Stats = s3Store->getS3Stats();
nested2.attr("put", s3Stats.put);
nested2.attr("putBytes", s3Stats.putBytes);
nested2.attr("putTimeMs", s3Stats.putTimeMs);
nested2.attr("putSpeed",
s3Stats.putTimeMs
? (double) s3Stats.putBytes / s3Stats.putTimeMs * 1000.0 / (1024.0 * 1024.0)
: 0.0);
nested2.attr("get", s3Stats.get);
nested2.attr("getBytes", s3Stats.getBytes);
nested2.attr("getTimeMs", s3Stats.getTimeMs);
nested2.attr("getSpeed",
s3Stats.getTimeMs
? (double) s3Stats.getBytes / s3Stats.getTimeMs * 1000.0 / (1024.0 * 1024.0)
: 0.0);
nested2.attr("head", s3Stats.head);
nested2.attr("costDollarApprox",
(s3Stats.get + s3Stats.head) / 10000.0 * 0.004
+ s3Stats.put / 1000.0 * 0.005 +
+ s3Stats.getBytes / (1024.0 * 1024.0 * 1024.0) * 0.09);
}
}
} }
if (log) printMsg(lvlInfo, format("status: %1%") % out.str()); if (log) printMsg(lvlInfo, format("status: %1%") % out.str());
{ {
auto mc = startDbUpdate();
pqxx::work txn(conn); pqxx::work txn(conn);
// FIXME: use PostgreSQL 9.5 upsert. // FIXME: use PostgreSQL 9.5 upsert.
txn.exec("delete from SystemStatus where what = 'queue-runner'"); txn.exec("delete from SystemStatus where what = 'queue-runner'");
@ -655,6 +756,40 @@ void State::run(BuildID buildOne)
if (!lock) if (!lock)
throw Error("hydra-queue-runner is already running"); throw Error("hydra-queue-runner is already running");
auto storeMode = hydraConfig["store_mode"];
_localStore = openStore();
if (storeMode == "direct" || storeMode == "") {
_destStore = _localStore;
}
else if (storeMode == "local-binary-cache") {
auto dir = hydraConfig["binary_cache_dir"];
if (dir == "")
throw Error("you must set binary_cache_dir in hydra.conf");
auto store = make_ref<LocalBinaryCacheStore>(
_localStore,
hydraConfig["binary_cache_secret_key_file"],
hydraConfig["binary_cache_public_key_file"],
dir);
store->init();
_destStore = std::shared_ptr<LocalBinaryCacheStore>(store);
}
else if (storeMode == "s3-binary-cache") {
auto bucketName = hydraConfig["binary_cache_s3_bucket"];
if (bucketName == "")
throw Error("you must set binary_cache_s3_bucket in hydra.conf");
auto store = make_ref<S3BinaryCacheStore>(
_localStore,
hydraConfig["binary_cache_secret_key_file"],
hydraConfig["binary_cache_public_key_file"],
bucketName);
store->init();
_destStore = std::shared_ptr<S3BinaryCacheStore>(store);
}
{ {
auto conn(dbPool.get()); auto conn(dbPool.get());
clearBusy(*conn, 0); clearBusy(*conn, 0);
@ -679,10 +814,10 @@ void State::run(BuildID buildOne)
while (true) { while (true) {
try { try {
auto conn(dbPool.get()); auto conn(dbPool.get());
receiver dumpStatus(*conn, "dump_status"); receiver dumpStatus_(*conn, "dump_status");
while (true) { while (true) {
bool timeout = conn->await_notification(300, 0) == 0; bool timeout = conn->await_notification(300, 0) == 0;
State::dumpStatus(*conn, timeout); dumpStatus(*conn, timeout);
} }
} catch (std::exception & e) { } catch (std::exception & e) {
printMsg(lvlError, format("main thread: %1%") % e.what()); printMsg(lvlError, format("main thread: %1%") % e.what());

View file

@ -1,85 +0,0 @@
#pragma once
#include <memory>
#include <list>
#include "sync.hh"
/* This template class implements a simple pool manager of resources
of some type R, such as database connections. It is used as
follows:
class Connection { ... };
Pool<Connection> pool;
{
auto conn(pool.get());
conn->exec("select ...");
}
Here, the Connection object referenced by conn is automatically
returned to the pool when conn goes out of scope.
*/
template <class R>
class Pool
{
private:
struct State
{
unsigned int count = 0;
std::list<std::shared_ptr<R>> idle;
};
Sync<State> state;
public:
class Handle
{
private:
Pool & pool;
std::shared_ptr<R> r;
friend Pool;
Handle(Pool & pool, std::shared_ptr<R> r) : pool(pool), r(r) { }
public:
Handle(Handle && h) : pool(h.pool), r(h.r) { h.r.reset(); }
Handle(const Handle & l) = delete;
~Handle()
{
auto state_(pool.state.lock());
if (r) state_->idle.push_back(r);
}
R * operator -> () { return r.get(); }
R & operator * () { return *r; }
};
Handle get()
{
{
auto state_(state.lock());
if (!state_->idle.empty()) {
auto p = state_->idle.back();
state_->idle.pop_back();
return Handle(*this, p);
}
state_->count++;
}
/* Note: we don't hold the lock while creating a new instance,
because creation might take a long time. */
return Handle(*this, std::make_shared<R>());
}
unsigned int count()
{
auto state_(state.lock());
return state_->count;
}
};

View file

@ -30,12 +30,13 @@ void State::queueMonitorLoop()
receiver buildsBumped(*conn, "builds_bumped"); receiver buildsBumped(*conn, "builds_bumped");
receiver jobsetSharesChanged(*conn, "jobset_shares_changed"); receiver jobsetSharesChanged(*conn, "jobset_shares_changed");
auto store = openStore(); // FIXME: pool auto localStore = getLocalStore();
auto destStore = getDestStore();
unsigned int lastBuildId = 0; unsigned int lastBuildId = 0;
while (true) { while (true) {
bool done = getQueuedBuilds(*conn, store, lastBuildId); bool done = getQueuedBuilds(*conn, localStore, destStore, lastBuildId);
/* Sleep until we get notification from the database about an /* Sleep until we get notification from the database about an
event. */ event. */
@ -63,7 +64,8 @@ void State::queueMonitorLoop()
} }
bool State::getQueuedBuilds(Connection & conn, ref<Store> store, unsigned int & lastBuildId) bool State::getQueuedBuilds(Connection & conn, ref<Store> localStore,
ref<Store> destStore, unsigned int & lastBuildId)
{ {
printMsg(lvlInfo, format("checking the queue for builds > %1%...") % lastBuildId); printMsg(lvlInfo, format("checking the queue for builds > %1%...") % lastBuildId);
@ -118,10 +120,11 @@ bool State::getQueuedBuilds(Connection & conn, ref<Store> store, unsigned int &
nrAdded++; nrAdded++;
newBuildsByID.erase(build->id); newBuildsByID.erase(build->id);
if (!store->isValidPath(build->drvPath)) { if (!localStore->isValidPath(build->drvPath)) {
/* Derivation has been GC'ed prematurely. */ /* Derivation has been GC'ed prematurely. */
printMsg(lvlError, format("aborting GC'ed build %1%") % build->id); printMsg(lvlError, format("aborting GC'ed build %1%") % build->id);
if (!build->finishedInDB) { if (!build->finishedInDB) {
auto mc = startDbUpdate();
pqxx::work txn(conn); pqxx::work txn(conn);
txn.parameterized txn.parameterized
("update Builds set finished = 1, buildStatus = $2, startTime = $3, stopTime = $3, errorMsg = $4 where id = $1 and finished = 0") ("update Builds set finished = 1, buildStatus = $2, startTime = $3, stopTime = $3, errorMsg = $4 where id = $1 and finished = 0")
@ -138,7 +141,8 @@ bool State::getQueuedBuilds(Connection & conn, ref<Store> store, unsigned int &
std::set<Step::ptr> newSteps; std::set<Step::ptr> newSteps;
std::set<Path> finishedDrvs; // FIXME: re-use? std::set<Path> finishedDrvs; // FIXME: re-use?
Step::ptr step = createStep(store, conn, build, build->drvPath, build, 0, finishedDrvs, newSteps, newRunnable); Step::ptr step = createStep(destStore, conn, build, build->drvPath,
build, 0, finishedDrvs, newSteps, newRunnable);
/* Some of the new steps may be the top level of builds that /* Some of the new steps may be the top level of builds that
we haven't processed yet. So do them now. This ensures that we haven't processed yet. So do them now. This ensures that
@ -156,12 +160,15 @@ bool State::getQueuedBuilds(Connection & conn, ref<Store> store, unsigned int &
all valid. So we mark this as a finished, cached build. */ all valid. So we mark this as a finished, cached build. */
if (!step) { if (!step) {
Derivation drv = readDerivation(build->drvPath); Derivation drv = readDerivation(build->drvPath);
BuildOutput res = getBuildOutput(store, drv); BuildOutput res = getBuildOutput(destStore, destStore->getFSAccessor(), drv);
{
auto mc = startDbUpdate();
pqxx::work txn(conn); pqxx::work txn(conn);
time_t now = time(0); time_t now = time(0);
markSucceededBuild(txn, build, res, true, now, now); markSucceededBuild(txn, build, res, true, now, now);
txn.commit(); txn.commit();
}
build->finishedInDB = true; build->finishedInDB = true;
@ -175,6 +182,7 @@ bool State::getQueuedBuilds(Connection & conn, ref<Store> store, unsigned int &
if (checkCachedFailure(r, conn)) { if (checkCachedFailure(r, conn)) {
printMsg(lvlError, format("marking build %1% as cached failure") % build->id); printMsg(lvlError, format("marking build %1% as cached failure") % build->id);
if (!build->finishedInDB) { if (!build->finishedInDB) {
auto mc = startDbUpdate();
pqxx::work txn(conn); pqxx::work txn(conn);
/* Find the previous build step record, first by /* Find the previous build step record, first by
@ -314,7 +322,7 @@ void State::processQueueChange(Connection & conn)
} }
Step::ptr State::createStep(ref<Store> store, Step::ptr State::createStep(ref<Store> destStore,
Connection & conn, Build::ptr build, const Path & drvPath, Connection & conn, Build::ptr build, const Path & drvPath,
Build::ptr referringBuild, Step::ptr referringStep, std::set<Path> & finishedDrvs, Build::ptr referringBuild, Step::ptr referringStep, std::set<Path> & finishedDrvs,
std::set<Step::ptr> & newSteps, std::set<Step::ptr> & newRunnable) std::set<Step::ptr> & newSteps, std::set<Step::ptr> & newRunnable)
@ -394,7 +402,7 @@ Step::ptr State::createStep(ref<Store> store,
DerivationOutputs missing; DerivationOutputs missing;
PathSet missingPaths; PathSet missingPaths;
for (auto & i : step->drv.outputs) for (auto & i : step->drv.outputs)
if (!store->isValidPath(i.second.path)) { if (!destStore->isValidPath(i.second.path)) {
valid = false; valid = false;
missing[i.first] = i.second; missing[i.first] = i.second;
missingPaths.insert(i.second.path); missingPaths.insert(i.second.path);
@ -406,7 +414,7 @@ Step::ptr State::createStep(ref<Store> store,
assert(missing.size() == missingPaths.size()); assert(missing.size() == missingPaths.size());
if (!missing.empty() && settings.useSubstitutes) { if (!missing.empty() && settings.useSubstitutes) {
SubstitutablePathInfos infos; SubstitutablePathInfos infos;
store->querySubstitutablePathInfos(missingPaths, infos); destStore->querySubstitutablePathInfos(missingPaths, infos); // FIXME
if (infos.size() == missingPaths.size()) { if (infos.size() == missingPaths.size()) {
valid = true; valid = true;
for (auto & i : missing) { for (auto & i : missing) {
@ -414,10 +422,11 @@ Step::ptr State::createStep(ref<Store> store,
printMsg(lvlInfo, format("substituting output %1% of %2%") % i.second.path % drvPath); printMsg(lvlInfo, format("substituting output %1% of %2%") % i.second.path % drvPath);
time_t startTime = time(0); time_t startTime = time(0);
store->ensurePath(i.second.path); destStore->ensurePath(i.second.path);
time_t stopTime = time(0); time_t stopTime = time(0);
{ {
auto mc = startDbUpdate();
pqxx::work txn(conn); pqxx::work txn(conn);
createSubstitutionStep(txn, startTime, stopTime, build, drvPath, "out", i.second.path); createSubstitutionStep(txn, startTime, stopTime, build, drvPath, "out", i.second.path);
txn.commit(); txn.commit();
@ -443,7 +452,7 @@ Step::ptr State::createStep(ref<Store> store,
/* Create steps for the dependencies. */ /* Create steps for the dependencies. */
for (auto & i : step->drv.inputDrvs) { for (auto & i : step->drv.inputDrvs) {
auto dep = createStep(store, conn, build, i.first, 0, step, finishedDrvs, newSteps, newRunnable); auto dep = createStep(destStore, conn, build, i.first, 0, step, finishedDrvs, newSteps, newRunnable);
if (dep) { if (dep) {
auto step_(step->state.lock()); auto step_(step->state.lock());
step_->deps.insert(dep); step_->deps.insert(dep);

View file

@ -0,0 +1,180 @@
#include "s3-binary-cache-store.hh"
#include "nar-info.hh"
#include <aws/core/client/ClientConfiguration.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/model/CreateBucketRequest.h>
#include <aws/s3/model/GetBucketLocationRequest.h>
#include <aws/s3/model/GetObjectRequest.h>
#include <aws/s3/model/HeadObjectRequest.h>
#include <aws/s3/model/PutObjectRequest.h>
namespace nix {
struct S3Error : public Error
{
Aws::S3::S3Errors err;
S3Error(Aws::S3::S3Errors err, const FormatOrString & fs)
: Error(fs), err(err) { };
};
/* Helper: given an Outcome<R, E>, return R in case of success, or
throw an exception in case of an error. */
template<typename R, typename E>
R && checkAws(const FormatOrString & fs, Aws::Utils::Outcome<R, E> && outcome)
{
if (!outcome.IsSuccess())
throw S3Error(
outcome.GetError().GetErrorType(),
fs.s + ": " + outcome.GetError().GetMessage());
return outcome.GetResultWithOwnership();
}
S3BinaryCacheStore::S3BinaryCacheStore(std::shared_ptr<Store> localStore,
const Path & secretKeyFile, const Path & publicKeyFile,
const std::string & bucketName)
: BinaryCacheStore(localStore, secretKeyFile, publicKeyFile)
, bucketName(bucketName)
, config(makeConfig())
, client(make_ref<Aws::S3::S3Client>(*config))
{
}
ref<Aws::Client::ClientConfiguration> S3BinaryCacheStore::makeConfig()
{
auto res = make_ref<Aws::Client::ClientConfiguration>();
res->region = Aws::Region::US_EAST_1;
res->requestTimeoutMs = 600 * 1000;
return res;
}
void S3BinaryCacheStore::init()
{
/* Create the bucket if it doesn't already exists. */
// FIXME: HeadBucket would be more appropriate, but doesn't return
// an easily parsed 404 message.
auto res = client->GetBucketLocation(
Aws::S3::Model::GetBucketLocationRequest().WithBucket(bucketName));
if (!res.IsSuccess()) {
if (res.GetError().GetErrorType() != Aws::S3::S3Errors::NO_SUCH_BUCKET)
throw Error(format("AWS error checking bucket %s: %s") % bucketName % res.GetError().GetMessage());
checkAws(format("AWS error creating bucket %s") % bucketName,
client->CreateBucket(
Aws::S3::Model::CreateBucketRequest()
.WithBucket(bucketName)
.WithCreateBucketConfiguration(
Aws::S3::Model::CreateBucketConfiguration()
/* .WithLocationConstraint(
Aws::S3::Model::BucketLocationConstraint::US) */ )));
}
BinaryCacheStore::init();
}
const S3BinaryCacheStore::Stats & S3BinaryCacheStore::getS3Stats()
{
return stats;
}
/* This is a specialisation of isValidPath() that optimistically
fetches the .narinfo file, rather than first checking for its
existence via a HEAD request. Since .narinfos are small, doing a
GET is unlikely to be slower than HEAD. */
bool S3BinaryCacheStore::isValidPath(const Path & storePath)
{
try {
readNarInfo(storePath);
return true;
} catch (S3Error & e) {
if (e.err == Aws::S3::S3Errors::NO_SUCH_KEY) return false;
throw;
}
}
bool S3BinaryCacheStore::fileExists(const std::string & path)
{
stats.head++;
auto res = client->HeadObject(
Aws::S3::Model::HeadObjectRequest()
.WithBucket(bucketName)
.WithKey(path));
if (!res.IsSuccess()) {
auto & error = res.GetError();
if (error.GetErrorType() == Aws::S3::S3Errors::UNKNOWN // FIXME
&& error.GetMessage().find("404") != std::string::npos)
return false;
throw Error(format("AWS error fetching %s") % path % error.GetMessage());
}
return true;
}
void S3BinaryCacheStore::upsertFile(const std::string & path, const std::string & data)
{
auto request =
Aws::S3::Model::PutObjectRequest()
.WithBucket(bucketName)
.WithKey(path);
auto stream = std::make_shared<std::stringstream>(data);
request.SetBody(stream);
stats.put++;
stats.putBytes += data.size();
auto now1 = std::chrono::steady_clock::now();
auto result = checkAws(format("AWS error uploading %s") % path,
client->PutObject(request));
auto now2 = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
printMsg(lvlInfo, format("uploaded s3://%1%/%2% (%3% bytes) in %4% ms")
% bucketName % path % data.size() % duration);
stats.putTimeMs += duration;
}
std::string S3BinaryCacheStore::getFile(const std::string & path)
{
auto request =
Aws::S3::Model::GetObjectRequest()
.WithBucket(bucketName)
.WithKey(path);
request.SetResponseStreamFactory([&]() {
return Aws::New<std::stringstream>("STRINGSTREAM");
});
stats.get++;
auto now1 = std::chrono::steady_clock::now();
auto result = checkAws(format("AWS error fetching %s") % path,
client->GetObject(request));
auto now2 = std::chrono::steady_clock::now();
auto res = dynamic_cast<std::stringstream &>(result.GetBody()).str();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
printMsg(lvlTalkative, format("downloaded s3://%1%/%2% (%3% bytes) in %4% ms")
% bucketName % path % res.size() % duration);
stats.getBytes += res.size();
stats.getTimeMs += duration;
return res;
}
}

View file

@ -0,0 +1,60 @@
#pragma once
#include "binary-cache-store.hh"
#include <atomic>
namespace Aws { namespace Client { class ClientConfiguration; } }
namespace Aws { namespace S3 { class S3Client; } }
namespace nix {
class S3BinaryCacheStore : public BinaryCacheStore
{
private:
std::string bucketName;
ref<Aws::Client::ClientConfiguration> config;
ref<Aws::S3::S3Client> client;
public:
S3BinaryCacheStore(std::shared_ptr<Store> localStore,
const Path & secretKeyFile, const Path & publicKeyFile,
const std::string & bucketName);
void init() override;
struct Stats
{
std::atomic<uint64_t> put{0};
std::atomic<uint64_t> putBytes{0};
std::atomic<uint64_t> putTimeMs{0};
std::atomic<uint64_t> get{0};
std::atomic<uint64_t> getBytes{0};
std::atomic<uint64_t> getTimeMs{0};
std::atomic<uint64_t> head{0};
};
const Stats & getS3Stats();
bool isValidPath(const Path & storePath) override;
private:
Stats stats;
ref<Aws::Client::ClientConfiguration> makeConfig();
protected:
bool fileExists(const std::string & path) override;
void upsertFile(const std::string & path, const std::string & data) override;
std::string getFile(const std::string & path) override;
};
}

View file

@ -16,6 +16,8 @@
#include "store-api.hh" #include "store-api.hh"
#include "derivations.hh" #include "derivations.hh"
#include "binary-cache-store.hh" // FIXME
typedef unsigned int BuildID; typedef unsigned int BuildID;
@ -51,6 +53,7 @@ struct RemoteResult : nix::BuildResult
time_t startTime = 0, stopTime = 0; time_t startTime = 0, stopTime = 0;
unsigned int overhead = 0; unsigned int overhead = 0;
nix::Path logFile; nix::Path logFile;
std::shared_ptr<nix::FSAccessor> accessor;
bool canRetry() bool canRetry()
{ {
@ -78,7 +81,7 @@ private:
std::atomic<unsigned int> shares{1}; std::atomic<unsigned int> shares{1};
/* The start time and duration of the most recent build steps. */ /* The start time and duration of the most recent build steps. */
Sync<std::map<time_t, time_t>> steps; nix::Sync<std::map<time_t, time_t>> steps;
public: public:
@ -185,7 +188,7 @@ struct Step
std::atomic_bool finished{false}; // debugging std::atomic_bool finished{false}; // debugging
Sync<State> state; nix::Sync<State> state;
~Step() ~Step()
{ {
@ -225,7 +228,7 @@ struct Machine
system_time lastFailure, disabledUntil; system_time lastFailure, disabledUntil;
unsigned int consecutiveFailures; unsigned int consecutiveFailures;
}; };
Sync<ConnectInfo> connectInfo; nix::Sync<ConnectInfo> connectInfo;
/* Mutex to prevent multiple threads from sending data to the /* Mutex to prevent multiple threads from sending data to the
same machine (which would be inefficient). */ same machine (which would be inefficient). */
@ -260,35 +263,37 @@ private:
nix::Path hydraData, logDir; nix::Path hydraData, logDir;
std::map<std::string, std::string> hydraConfig;
/* The queued builds. */ /* The queued builds. */
typedef std::map<BuildID, Build::ptr> Builds; typedef std::map<BuildID, Build::ptr> Builds;
Sync<Builds> builds; nix::Sync<Builds> builds;
/* The jobsets. */ /* The jobsets. */
typedef std::map<std::pair<std::string, std::string>, Jobset::ptr> Jobsets; typedef std::map<std::pair<std::string, std::string>, Jobset::ptr> Jobsets;
Sync<Jobsets> jobsets; nix::Sync<Jobsets> jobsets;
/* All active or pending build steps (i.e. dependencies of the /* All active or pending build steps (i.e. dependencies of the
queued builds). Note that these are weak pointers. Steps are queued builds). Note that these are weak pointers. Steps are
kept alive by being reachable from Builds or by being in kept alive by being reachable from Builds or by being in
progress. */ progress. */
typedef std::map<nix::Path, Step::wptr> Steps; typedef std::map<nix::Path, Step::wptr> Steps;
Sync<Steps> steps; nix::Sync<Steps> steps;
/* Build steps that have no unbuilt dependencies. */ /* Build steps that have no unbuilt dependencies. */
typedef std::list<Step::wptr> Runnable; typedef std::list<Step::wptr> Runnable;
Sync<Runnable> runnable; nix::Sync<Runnable> runnable;
/* CV for waking up the dispatcher. */ /* CV for waking up the dispatcher. */
Sync<bool> dispatcherWakeup; nix::Sync<bool> dispatcherWakeup;
std::condition_variable_any dispatcherWakeupCV; std::condition_variable dispatcherWakeupCV;
/* PostgreSQL connection pool. */ /* PostgreSQL connection pool. */
Pool<Connection> dbPool; nix::Pool<Connection> dbPool;
/* The build machines. */ /* The build machines. */
typedef std::map<std::string, Machine::ptr> Machines; typedef std::map<std::string, Machine::ptr> Machines;
Sync<Machines> machines; // FIXME: use atomic_shared_ptr nix::Sync<Machines> machines; // FIXME: use atomic_shared_ptr
/* Various stats. */ /* Various stats. */
time_t startedAt; time_t startedAt;
@ -308,18 +313,19 @@ private:
counter nrDispatcherWakeups{0}; counter nrDispatcherWakeups{0};
counter bytesSent{0}; counter bytesSent{0};
counter bytesReceived{0}; counter bytesReceived{0};
counter nrActiveDbUpdates{0};
/* Log compressor work queue. */ /* Log compressor work queue. */
Sync<std::queue<nix::Path>> logCompressorQueue; nix::Sync<std::queue<nix::Path>> logCompressorQueue;
std::condition_variable_any logCompressorWakeup; std::condition_variable logCompressorWakeup;
/* Notification sender work queue. FIXME: if hydra-queue-runner is /* Notification sender work queue. FIXME: if hydra-queue-runner is
killed before it has finished sending notifications about a killed before it has finished sending notifications about a
build, then the notifications may be lost. It would be better build, then the notifications may be lost. It would be better
to mark builds with pending notification in the database. */ to mark builds with pending notification in the database. */
typedef std::pair<BuildID, std::vector<BuildID>> NotificationItem; typedef std::pair<BuildID, std::vector<BuildID>> NotificationItem;
Sync<std::queue<NotificationItem>> notificationSenderQueue; nix::Sync<std::queue<NotificationItem>> notificationSenderQueue;
std::condition_variable_any notificationSenderWakeup; std::condition_variable notificationSenderWakeup;
/* Specific build to do for --build-one (testing only). */ /* Specific build to do for --build-one (testing only). */
BuildID buildOne; BuildID buildOne;
@ -332,7 +338,7 @@ private:
std::chrono::seconds waitTime; // time runnable steps have been waiting std::chrono::seconds waitTime; // time runnable steps have been waiting
}; };
Sync<std::map<std::string, MachineType>> machineTypes; nix::Sync<std::map<std::string, MachineType>> machineTypes;
struct MachineReservation struct MachineReservation
{ {
@ -346,11 +352,23 @@ private:
std::atomic<time_t> lastDispatcherCheck{0}; std::atomic<time_t> lastDispatcherCheck{0};
std::shared_ptr<nix::Store> _localStore;
std::shared_ptr<nix::Store> _destStore;
public: public:
State(); State();
private: private:
MaintainCount startDbUpdate();
/* Return a store object that can access derivations produced by
hydra-evaluator. */
nix::ref<nix::Store> getLocalStore();
/* Return a store object to store build results. */
nix::ref<nix::Store> getDestStore();
void clearBusy(Connection & conn, time_t stopTime); void clearBusy(Connection & conn, time_t stopTime);
void parseMachines(const std::string & contents); void parseMachines(const std::string & contents);
@ -379,7 +397,8 @@ private:
void queueMonitorLoop(); void queueMonitorLoop();
/* Check the queue for new builds. */ /* Check the queue for new builds. */
bool getQueuedBuilds(Connection & conn, nix::ref<nix::Store> store, unsigned int & lastBuildId); bool getQueuedBuilds(Connection & conn, nix::ref<nix::Store> localStore,
nix::ref<nix::Store> destStore, unsigned int & lastBuildId);
/* Handle cancellation, deletion and priority bumps. */ /* Handle cancellation, deletion and priority bumps. */
void processQueueChange(Connection & conn); void processQueueChange(Connection & conn);
@ -407,10 +426,10 @@ private:
/* Perform the given build step. Return true if the step is to be /* Perform the given build step. Return true if the step is to be
retried. */ retried. */
bool doBuildStep(nix::ref<nix::Store> store, Step::ptr step, bool doBuildStep(nix::ref<nix::Store> destStore, Step::ptr step,
Machine::ptr machine); Machine::ptr machine);
void buildRemote(nix::ref<nix::Store> store, void buildRemote(nix::ref<nix::Store> destStore,
Machine::ptr machine, Step::ptr step, Machine::ptr machine, Step::ptr step,
unsigned int maxSilentTime, unsigned int buildTimeout, unsigned int maxSilentTime, unsigned int buildTimeout,
RemoteResult & result); RemoteResult & result);

View file

@ -1,74 +0,0 @@
#pragma once
#include <mutex>
#include <condition_variable>
#include <cassert>
/* This template class ensures synchronized access to a value of type
T. It is used as follows:
struct Data { int x; ... };
Sync<Data> data;
{
auto data_(data.lock());
data_->x = 123;
}
Here, "data" is automatically unlocked when "data_" goes out of
scope.
*/
template <class T>
class Sync
{
private:
std::mutex mutex;
T data;
public:
Sync() { }
Sync(const T & data) : data(data) { }
class Lock
{
private:
Sync * s;
friend Sync;
Lock(Sync * s) : s(s) { s->mutex.lock(); }
public:
Lock(Lock && l) : s(l.s) { l.s = 0; }
Lock(const Lock & l) = delete;
~Lock() { if (s) s->mutex.unlock(); }
T * operator -> () { return &s->data; }
T & operator * () { return s->data; }
/* FIXME: performance impact of condition_variable_any? */
void wait(std::condition_variable_any & cv)
{
assert(s);
cv.wait(s->mutex);
}
template<class Rep, class Period, class Predicate>
bool wait_for(std::condition_variable_any & cv,
const std::chrono::duration<Rep, Period> & duration,
Predicate pred)
{
assert(s);
return cv.wait_for(s->mutex, duration, pred);
}
template<class Clock, class Duration>
std::cv_status wait_until(std::condition_variable_any & cv,
const std::chrono::time_point<Clock, Duration> & duration)
{
assert(s);
return cv.wait_until(s->mutex, duration);
}
};
Lock lock() { return Lock(this); }
};

View file

@ -14,7 +14,7 @@ class TokenServer
unsigned int maxTokens; unsigned int maxTokens;
Sync<unsigned int> curTokens{0}; Sync<unsigned int> curTokens{0};
std::condition_variable_any wakeup; std::condition_variable wakeup;
public: public:
TokenServer(unsigned int maxTokens) : maxTokens(maxTokens) { } TokenServer(unsigned int maxTokens) : maxTokens(maxTokens) { }

View file

@ -64,7 +64,10 @@ sub build_GET {
my $build = $c->stash->{build}; my $build = $c->stash->{build};
$c->stash->{template} = 'build.tt'; $c->stash->{template} = 'build.tt';
$c->stash->{available} = all { isValidPath($_->path) } $build->buildoutputs->all; $c->stash->{available} =
($c->config->{store_mode} // "direct") eq "direct"
? all { isValidPath($_->path) } $build->buildoutputs->all
: 1; # FIXME
$c->stash->{drvAvailable} = isValidPath $build->drvpath; $c->stash->{drvAvailable} = isValidPath $build->drvpath;
if ($build->finished && $build->iscachedbuild) { if ($build->finished && $build->iscachedbuild) {
@ -219,7 +222,33 @@ sub download : Chained('buildChain') PathPart {
} }
notFound($c, "Build doesn't have a product $productRef.") if !defined $product; notFound($c, "Build doesn't have a product $productRef.") if !defined $product;
notFound($c, "Build product " . $product->path . " has disappeared.") unless -e $product->path; if ($product->path !~ /^($Nix::Config::storeDir\/[^\/]+)/) {
die "Invalid store path " . $product->path . ".\n";
}
my $storePath = $1;
# Hack to get downloads to work on binary cache stores: if the
# store path is not available locally, then import it into the
# local store. FIXME: find a better way; this can require an
# unbounded amount of space.
if (!isValidPath($storePath)) {
my $storeMode = $c->config->{store_mode} // "direct";
notFound($c, "File " . $product->path . " has disappeared.")
if $storeMode eq "direct";
my $url =
$storeMode eq "local-binary-cache" ? "file://" . $c->config->{binary_cache_dir} :
$storeMode eq "s3-binary-cache" ? "https://" . $c->config->{binary_cache_s3_bucket} . ".s3.amazonaws.com/" :
die;
my $args = "";
if (defined $c->config->{binary_cache_public_key_file}
&& -r $c->config->{binary_cache_public_key_file})
{
$args = "--option binary-cache-public-keys " . read_file($c->config->{binary_cache_public_key_file});
}
system("nix-store --realise '$storePath' --option extra-binary-caches '$url' $args>/dev/null");
}
notFound($c, "File " . $product->path . " does not exist.") unless -e $product->path;
return $c->res->redirect(defaultUriForProduct($self, $c, $product, @path)) return $c->res->redirect(defaultUriForProduct($self, $c, $product, @path))
if scalar @path == 0 && ($product->name || $product->defaultpath); if scalar @path == 0 && ($product->name || $product->defaultpath);

View file

@ -118,6 +118,22 @@ sub status_GET {
} }
sub queue_runner_status :Local :Path('queue-runner-status') :Args(0) :ActionClass('REST') { }
sub queue_runner_status_GET {
my ($self, $c) = @_;
#my $status = from_json($c->model('DB::SystemStatus')->find('queue-runner')->status);
my $status = from_json(`hydra-queue-runner --status`);
if ($?) { $status->{status} = "unknown"; }
my $json = JSON->new->pretty()->canonical();
$c->stash->{template} = 'queue-runner-status.tt';
$c->stash->{status} = $json->encode($status);
$self->status_ok($c, entity => $status);
}
sub machines :Local Args(0) { sub machines :Local Args(0) {
my ($self, $c) = @_; my ($self, $c) = @_;
my $machines = getMachines; my $machines = getMachines;
@ -241,32 +257,73 @@ sub serialize : ActionClass('Serialize') { }
sub nar :Local :Args(1) { sub nar :Local :Args(1) {
my ($self, $c, $path) = @_; my ($self, $c, $path) = @_;
$path = ($ENV{NIX_STORE_DIR} || "/nix/store")."/$path"; die if $path =~ /\//;
my $storeMode = $c->config->{store_mode} // "direct";
if ($storeMode eq "s3-binary-cache") {
notFound($c, "There is no binary cache here.");
}
elsif ($storeMode eq "local-binary-cache") {
my $dir = $c->config->{binary_cache_dir};
$c->serve_static_file($dir . "/nar/" . $path);
}
else {
$path = $Nix::Config::storeDir . "/$path";
gone($c, "Path " . $path . " is no longer available.") unless isValidPath($path); gone($c, "Path " . $path . " is no longer available.") unless isValidPath($path);
$c->stash->{current_view} = 'NixNAR'; $c->stash->{current_view} = 'NixNAR';
$c->stash->{storePath} = $path; $c->stash->{storePath} = $path;
}
} }
sub nix_cache_info :Path('nix-cache-info') :Args(0) { sub nix_cache_info :Path('nix-cache-info') :Args(0) {
my ($self, $c) = @_; my ($self, $c) = @_;
my $storeMode = $c->config->{store_mode} // "direct";
if ($storeMode eq "s3-binary-cache") {
notFound($c, "There is no binary cache here.");
}
elsif ($storeMode eq "local-binary-cache") {
my $dir = $c->config->{binary_cache_dir};
$c->serve_static_file($dir . "/nix-cache-info");
}
else {
$c->response->content_type('text/plain'); $c->response->content_type('text/plain');
$c->stash->{plain}->{data} = $c->stash->{plain}->{data} =
#"StoreDir: $Nix::Config::storeDir\n" . # FIXME "StoreDir: $Nix::Config::storeDir\n" .
"StoreDir: " . ($ENV{NIX_STORE_DIR} || "/nix/store") . "\n" .
"WantMassQuery: 0\n" . "WantMassQuery: 0\n" .
# Give Hydra binary caches a very low priority (lower than the # Give Hydra binary caches a very low priority (lower than the
# static binary cache http://nixos.org/binary-cache). # static binary cache http://nixos.org/binary-cache).
"Priority: 100\n"; "Priority: 100\n";
setCacheHeaders($c, 24 * 60 * 60); setCacheHeaders($c, 24 * 60 * 60);
$c->forward('Hydra::View::Plain'); $c->forward('Hydra::View::Plain');
}
} }
sub narinfo :LocalRegex('^([a-z0-9]+).narinfo$') :Args(0) { sub narinfo :LocalRegex('^([a-z0-9]+).narinfo$') :Args(0) {
my ($self, $c) = @_; my ($self, $c) = @_;
my $storeMode = $c->config->{store_mode} // "direct";
if ($storeMode eq "s3-binary-cache") {
notFound($c, "There is no binary cache here.");
}
elsif ($storeMode eq "local-binary-cache") {
my $dir = $c->config->{binary_cache_dir};
$c->serve_static_file($dir . "/" . $c->req->captures->[0] . ".narinfo");
}
else {
my $hash = $c->req->captures->[0]; my $hash = $c->req->captures->[0];
die if length($hash) != 32; die if length($hash) != 32;
@ -283,6 +340,7 @@ sub narinfo :LocalRegex('^([a-z0-9]+).narinfo$') :Args(0) {
$c->stash->{storePath} = $path; $c->stash->{storePath} = $path;
$c->forward('Hydra::View::NARInfo'); $c->forward('Hydra::View::NARInfo');
}
} }

View file

@ -36,13 +36,10 @@ sub process {
# Optionally, sign the NAR info file we just created. # Optionally, sign the NAR info file we just created.
my $secretKeyFile = $c->config->{binary_cache_secret_key_file}; my $secretKeyFile = $c->config->{binary_cache_secret_key_file};
if (defined $secretKeyFile) { if (defined $secretKeyFile) {
my $s = readFile $secretKeyFile; my $secretKey = readFile $secretKeyFile;
chomp $s;
my ($keyName, $secretKey) = split ":", $s;
die "invalid secret key file\n" unless defined $keyName && defined $secretKey;
my $fingerprint = fingerprintPath($storePath, $narHash, $narSize, $refs); my $fingerprint = fingerprintPath($storePath, $narHash, $narSize, $refs);
my $sig = encode_base64(signString(decode_base64($secretKey), $fingerprint), ""); my $sig = signString($secretKey, $fingerprint);
$info .= "Sig: $keyName:$sig\n"; $info .= "Sig: $sig\n";
} }
setCacheHeaders($c, 24 * 60 * 60); setCacheHeaders($c, 24 * 60 * 60);

View file

@ -0,0 +1,8 @@
[% WRAPPER layout.tt title="Queue runner status" %]
[% PROCESS common.tt %]
<pre>
[% HTML.escape(status) %]
</pre>
[% END %]

View file

@ -1,3 +1,3 @@
#! /bin/sh #! /bin/sh
mkdir -p $out/nix-support/failed mkdir -p $out/nix-support
touch $out/nix-support/failed