Automatically reload $NIX_REMOTE_SYSTEMS when it changes

Otherwise, you'd have to restart the queue runner to add or remove
machines.
This commit is contained in:
Eelco Dolstra 2015-06-25 12:24:11 +02:00
parent 1a0e1eb5a0
commit 32210905d8

View file

@ -180,10 +180,15 @@ struct Machine
unsigned int maxJobs = 1;
float speedFactor = 1.0;
counter currentJobs{0};
counter nrStepsDone{0};
counter totalStepTime{0}; // total time for steps, including closure copying
counter totalStepBuildTime{0}; // total build time for steps
struct State {
typedef std::shared_ptr<State> ptr;
counter currentJobs{0};
counter nrStepsDone{0};
counter totalStepTime{0}; // total time for steps, including closure copying
counter totalStepBuildTime{0}; // total build time for steps
};
State::ptr state;
bool supportsStep(Step::ptr step)
{
@ -197,23 +202,6 @@ struct Machine
};
/* A RAII helper that manages the currentJobs field of Machine
objects. */
struct MachineReservation
{
typedef std::shared_ptr<MachineReservation> ptr;
Machine::ptr machine;
MachineReservation(Machine::ptr machine) : machine(machine)
{
machine->currentJobs++;
}
~MachineReservation()
{
machine->currentJobs--;
}
};
class State
{
private:
@ -243,9 +231,12 @@ private:
Pool<Connection> dbPool;
/* The build machines. */
typedef std::list<Machine::ptr> Machines;
typedef std::map<string, Machine::ptr> Machines;
Sync<Machines> machines;
Path machinesFile;
struct stat machinesFileStat;
/* Token server limiting the number of threads copying closures in
parallel to prevent excessive I/O load. */
TokenServer copyClosureTokenServer{maxParallelCopyClosure};
@ -285,7 +276,11 @@ private:
void clearBusy(Connection & conn, time_t stopTime);
void loadMachines();
/* (Re)load /etc/nix/machines. */
void loadMachinesFile();
/* Thread to reload /etc/nix/machines periodically. */
void monitorMachinesFile();
int createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step,
const std::string & machine, BuildStepStatus status, const std::string & errorMsg = "",
@ -316,7 +311,7 @@ private:
void wakeDispatcher();
void builder(Step::ptr step, MachineReservation::ptr reservation);
void builder(Step::ptr step, Machine::ptr machine, std::shared_ptr<MaintainCount> reservation);
/* Perform the given build step. Return true if the step is to be
retried. */
@ -357,54 +352,95 @@ State::State()
if (hydraData == "") throw Error("$HYDRA_DATA must be set");
logDir = canonPath(hydraData + "/build-logs");
machinesFile = getEnv("NIX_REMOTE_SYSTEMS", "/etc/nix/machines");
machinesFileStat.st_ino = 0;
machinesFileStat.st_mtime = 0;
}
void State::loadMachines()
void State::loadMachinesFile()
{
Path machinesFile = getEnv("NIX_REMOTE_SYSTEMS", "/etc/nix/machines");
Machines newMachines;
string contents;
if (pathExists(machinesFile)) {
for (auto line : tokenizeString<Strings>(readFile(machinesFile), "\n")) {
line = trim(string(line, 0, line.find('#')));
auto tokens = tokenizeString<std::vector<std::string>>(line);
if (tokens.size() < 3) continue;
tokens.resize(7);
auto machine = std::make_shared<Machine>();
machine->sshName = tokens[0];
machine->systemTypes = tokenizeString<StringSet>(tokens[1], ",");
machine->sshKey = tokens[2];
if (tokens[3] != "")
string2Int(tokens[3], machine->maxJobs);
else
machine->maxJobs = 1;
machine->speedFactor = atof(tokens[4].c_str());
machine->supportedFeatures = tokenizeString<StringSet>(tokens[5], ",");
machine->mandatoryFeatures = tokenizeString<StringSet>(tokens[6], ",");
for (auto & f : machine->mandatoryFeatures)
machine->supportedFeatures.insert(f);
newMachines.push_back(machine);
}
struct stat st;
if (stat(machinesFile.c_str(), &st) != 0)
throw SysError(format("getting stats about %1%") % machinesFile);
if (st.st_ino == machinesFileStat.st_ino && st.st_mtime == machinesFileStat.st_mtime)
return;
printMsg(lvlDebug, "reloading machines");
contents = readFile(machinesFile);
machinesFileStat = st;
} else {
auto machine = std::make_shared<Machine>();
machine->sshName = "localhost";
machine->systemTypes = StringSet({settings.thisSystem});
StringSet systems = StringSet({settings.thisSystem});
if (settings.thisSystem == "x86_64-linux")
machine->systemTypes.insert("i686-linux");
machine->maxJobs = settings.maxBuildJobs;
newMachines.push_back(machine);
systems.insert("i686-linux");
contents = "localhost " + concatStringsSep(",", systems)
+ " - " + int2String(settings.maxBuildJobs) + " 1";
}
Machines newMachines, oldMachines;
{
auto machines_(machines.lock());
oldMachines = *machines_;
}
for (auto line : tokenizeString<Strings>(contents, "\n")) {
line = trim(string(line, 0, line.find('#')));
auto tokens = tokenizeString<std::vector<std::string>>(line);
if (tokens.size() < 3) continue;
tokens.resize(7);
auto machine = std::make_shared<Machine>();
machine->sshName = tokens[0];
machine->systemTypes = tokenizeString<StringSet>(tokens[1], ",");
machine->sshKey = tokens[2];
if (tokens[3] != "")
string2Int(tokens[3], machine->maxJobs);
else
machine->maxJobs = 1;
machine->speedFactor = atof(tokens[4].c_str());
machine->supportedFeatures = tokenizeString<StringSet>(tokens[5], ",");
machine->mandatoryFeatures = tokenizeString<StringSet>(tokens[6], ",");
for (auto & f : machine->mandatoryFeatures)
machine->supportedFeatures.insert(f);
/* Re-use the State object of the previous machine with the
same name. */
auto i = oldMachines.find(machine->sshName);
if (i == oldMachines.end())
printMsg(lvlChatty, format("adding new machine %1%") % machine->sshName);
else
printMsg(lvlChatty, format("updating machine %1%") % machine->sshName);
machine->state = i == oldMachines.end()
? std::make_shared<Machine::State>()
: i->second->state;
newMachines[machine->sshName] = machine;
}
for (auto & m : oldMachines)
if (newMachines.find(m.first) == newMachines.end())
printMsg(lvlInfo, format("removing machine %1%") % m.first);
auto machines_(machines.lock());
*machines_ = newMachines;
}
void State::monitorMachinesFile()
{
while (true) {
try {
// FIXME: use inotify.
sleep(60);
loadMachinesFile();
} catch (std::exception & e) {
printMsg(lvlError, format("reloading machines file: %1%") % e.what());
}
}
}
void State::clearBusy(Connection & conn, time_t stopTime)
{
pqxx::work txn(conn);
@ -619,7 +655,7 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store,
{
auto machines_(machines.lock()); // FIXME: use shared_mutex
for (auto & m : *machines_)
if (m->supportsStep(r)) { supported = true; break; }
if (m.second->supportsStep(r)) { supported = true; break; }
}
if (!supported) {
@ -896,7 +932,7 @@ void State::dispatcher()
{
auto machines_(machines.lock());
for (auto & m : *machines_)
machinesSorted.push_back({m, m->currentJobs});
machinesSorted.push_back({m.second, m.second->state->currentJobs});
}
/* Sort the machines by a combination of speed factor and
@ -929,7 +965,7 @@ void State::dispatcher()
for (auto & mi : machinesSorted) {
// FIXME: can we lose a wakeup if a builder exits concurrently?
if (mi.machine->currentJobs >= mi.machine->maxJobs) continue;
if (mi.machine->state->currentJobs >= mi.machine->maxJobs) continue;
auto runnable_(runnable.lock());
//printMsg(lvlDebug, format("%1% runnable builds") % runnable_->size());
@ -966,10 +1002,10 @@ void State::dispatcher()
/* Make a slot reservation and start a thread to
do the build. */
auto reservation = std::make_shared<MachineReservation>(mi.machine);
auto reservation = std::make_shared<MaintainCount>(mi.machine->state->currentJobs);
i = runnable_->erase(i);
auto builderThread = std::thread(&State::builder, this, step, reservation);
auto builderThread = std::thread(&State::builder, this, step, mi.machine, reservation);
builderThread.detach(); // FIXME?
keepGoing = true;
@ -1003,7 +1039,7 @@ void State::wakeDispatcher()
}
void State::builder(Step::ptr step, MachineReservation::ptr reservation)
void State::builder(Step::ptr step, Machine::ptr machine, std::shared_ptr<MaintainCount> reservation)
{
bool retry = true;
@ -1011,10 +1047,10 @@ void State::builder(Step::ptr step, MachineReservation::ptr reservation)
try {
auto store = openStore(); // FIXME: pool
retry = doBuildStep(store, step, reservation->machine);
retry = doBuildStep(store, step, machine);
} catch (std::exception & e) {
printMsg(lvlError, format("uncaught exception building %1% on %2%: %3%")
% step->drvPath % reservation->machine->sshName % e.what());
% step->drvPath % machine->sshName % e.what());
}
/* Release the machine and wake up the dispatcher. */
@ -1359,9 +1395,9 @@ bool State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,
nrStepsDone++;
totalStepTime += stepStopTime - stepStartTime;
totalStepBuildTime += result.stopTime - result.startTime;
machine->nrStepsDone++;
machine->totalStepTime += stepStopTime - stepStartTime;
machine->totalStepBuildTime += result.stopTime - result.startTime;
machine->state->nrStepsDone++;
machine->state->totalStepTime += stepStopTime - stepStartTime;
machine->state->totalStepBuildTime += result.stopTime - result.startTime;
return false;
}
@ -1564,16 +1600,18 @@ void State::dumpStatus(Connection & conn, bool log)
root.attr("machines");
JSONObject nested(out);
auto machines_(machines.lock());
for (auto & m : *machines_) {
for (auto & i : *machines_) {
auto & m(i.second);
auto & s(m->state);
nested.attr(m->sshName);
JSONObject nested2(out);
nested2.attr("currentJobs", m->currentJobs);
nested2.attr("nrStepsDone", m->nrStepsDone);
if (m->nrStepsDone) {
nested2.attr("totalStepTime", m->totalStepTime);
nested2.attr("totalStepBuildTime", m->totalStepBuildTime);
nested2.attr("avgStepTime"); out << (float) m->totalStepTime / m->nrStepsDone;
nested2.attr("avgStepBuildTime"); out << (float) m->totalStepBuildTime / m->nrStepsDone;
nested2.attr("currentJobs", s->currentJobs);
nested2.attr("nrStepsDone", s->nrStepsDone);
if (m->state->nrStepsDone) {
nested2.attr("totalStepTime", s->totalStepTime);
nested2.attr("totalStepBuildTime", s->totalStepBuildTime);
nested2.attr("avgStepTime"); out << (float) s->totalStepTime / s->nrStepsDone;
nested2.attr("avgStepBuildTime"); out << (float) s->totalStepBuildTime / s->nrStepsDone;
}
}
}
@ -1670,7 +1708,9 @@ void State::run()
dumpStatus(*conn, false);
}
loadMachines();
loadMachinesFile();
std::thread(&State::monitorMachinesFile, this).detach();
std::thread(&State::queueMonitor, this).detach();