diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index f6ba6179..ccb3e829 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -180,10 +180,15 @@ struct Machine unsigned int maxJobs = 1; float speedFactor = 1.0; - counter currentJobs{0}; - counter nrStepsDone{0}; - counter totalStepTime{0}; // total time for steps, including closure copying - counter totalStepBuildTime{0}; // total build time for steps + struct State { + typedef std::shared_ptr ptr; + counter currentJobs{0}; + counter nrStepsDone{0}; + counter totalStepTime{0}; // total time for steps, including closure copying + counter totalStepBuildTime{0}; // total build time for steps + }; + + State::ptr state; bool supportsStep(Step::ptr step) { @@ -197,23 +202,6 @@ struct Machine }; -/* A RAII helper that manages the currentJobs field of Machine - objects. */ -struct MachineReservation -{ - typedef std::shared_ptr ptr; - Machine::ptr machine; - MachineReservation(Machine::ptr machine) : machine(machine) - { - machine->currentJobs++; - } - ~MachineReservation() - { - machine->currentJobs--; - } -}; - - class State { private: @@ -243,9 +231,12 @@ private: Pool dbPool; /* The build machines. */ - typedef std::list Machines; + typedef std::map Machines; Sync machines; + Path machinesFile; + struct stat machinesFileStat; + /* Token server limiting the number of threads copying closures in parallel to prevent excessive I/O load. */ TokenServer copyClosureTokenServer{maxParallelCopyClosure}; @@ -285,7 +276,11 @@ private: void clearBusy(Connection & conn, time_t stopTime); - void loadMachines(); + /* (Re)load /etc/nix/machines. */ + void loadMachinesFile(); + + /* Thread to reload /etc/nix/machines periodically. */ + void monitorMachinesFile(); int createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step, const std::string & machine, BuildStepStatus status, const std::string & errorMsg = "", @@ -316,7 +311,7 @@ private: void wakeDispatcher(); - void builder(Step::ptr step, MachineReservation::ptr reservation); + void builder(Step::ptr step, Machine::ptr machine, std::shared_ptr reservation); /* Perform the given build step. Return true if the step is to be retried. */ @@ -357,54 +352,95 @@ State::State() if (hydraData == "") throw Error("$HYDRA_DATA must be set"); logDir = canonPath(hydraData + "/build-logs"); + + machinesFile = getEnv("NIX_REMOTE_SYSTEMS", "/etc/nix/machines"); + machinesFileStat.st_ino = 0; + machinesFileStat.st_mtime = 0; } -void State::loadMachines() +void State::loadMachinesFile() { - Path machinesFile = getEnv("NIX_REMOTE_SYSTEMS", "/etc/nix/machines"); - - Machines newMachines; - + string contents; if (pathExists(machinesFile)) { - - for (auto line : tokenizeString(readFile(machinesFile), "\n")) { - line = trim(string(line, 0, line.find('#'))); - auto tokens = tokenizeString>(line); - if (tokens.size() < 3) continue; - tokens.resize(7); - - auto machine = std::make_shared(); - machine->sshName = tokens[0]; - machine->systemTypes = tokenizeString(tokens[1], ","); - machine->sshKey = tokens[2]; - if (tokens[3] != "") - string2Int(tokens[3], machine->maxJobs); - else - machine->maxJobs = 1; - machine->speedFactor = atof(tokens[4].c_str()); - machine->supportedFeatures = tokenizeString(tokens[5], ","); - machine->mandatoryFeatures = tokenizeString(tokens[6], ","); - for (auto & f : machine->mandatoryFeatures) - machine->supportedFeatures.insert(f); - newMachines.push_back(machine); - } - + struct stat st; + if (stat(machinesFile.c_str(), &st) != 0) + throw SysError(format("getting stats about ‘%1%’") % machinesFile); + if (st.st_ino == machinesFileStat.st_ino && st.st_mtime == machinesFileStat.st_mtime) + return; + printMsg(lvlDebug, "reloading machines"); + contents = readFile(machinesFile); + machinesFileStat = st; } else { - auto machine = std::make_shared(); - machine->sshName = "localhost"; - machine->systemTypes = StringSet({settings.thisSystem}); + StringSet systems = StringSet({settings.thisSystem}); if (settings.thisSystem == "x86_64-linux") - machine->systemTypes.insert("i686-linux"); - machine->maxJobs = settings.maxBuildJobs; - newMachines.push_back(machine); + systems.insert("i686-linux"); + contents = "localhost " + concatStringsSep(",", systems) + + " - " + int2String(settings.maxBuildJobs) + " 1"; } + Machines newMachines, oldMachines; + { + auto machines_(machines.lock()); + oldMachines = *machines_; + } + + for (auto line : tokenizeString(contents, "\n")) { + line = trim(string(line, 0, line.find('#'))); + auto tokens = tokenizeString>(line); + if (tokens.size() < 3) continue; + tokens.resize(7); + + auto machine = std::make_shared(); + machine->sshName = tokens[0]; + machine->systemTypes = tokenizeString(tokens[1], ","); + machine->sshKey = tokens[2]; + if (tokens[3] != "") + string2Int(tokens[3], machine->maxJobs); + else + machine->maxJobs = 1; + machine->speedFactor = atof(tokens[4].c_str()); + machine->supportedFeatures = tokenizeString(tokens[5], ","); + machine->mandatoryFeatures = tokenizeString(tokens[6], ","); + for (auto & f : machine->mandatoryFeatures) + machine->supportedFeatures.insert(f); + + /* Re-use the State object of the previous machine with the + same name. */ + auto i = oldMachines.find(machine->sshName); + if (i == oldMachines.end()) + printMsg(lvlChatty, format("adding new machine ‘%1%’") % machine->sshName); + else + printMsg(lvlChatty, format("updating machine ‘%1%’") % machine->sshName); + machine->state = i == oldMachines.end() + ? std::make_shared() + : i->second->state; + newMachines[machine->sshName] = machine; + } + + for (auto & m : oldMachines) + if (newMachines.find(m.first) == newMachines.end()) + printMsg(lvlInfo, format("removing machine ‘%1%’") % m.first); + auto machines_(machines.lock()); *machines_ = newMachines; } +void State::monitorMachinesFile() +{ + while (true) { + try { + // FIXME: use inotify. + sleep(60); + loadMachinesFile(); + } catch (std::exception & e) { + printMsg(lvlError, format("reloading machines file: %1%") % e.what()); + } + } +} + + void State::clearBusy(Connection & conn, time_t stopTime) { pqxx::work txn(conn); @@ -619,7 +655,7 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr store, { auto machines_(machines.lock()); // FIXME: use shared_mutex for (auto & m : *machines_) - if (m->supportsStep(r)) { supported = true; break; } + if (m.second->supportsStep(r)) { supported = true; break; } } if (!supported) { @@ -896,7 +932,7 @@ void State::dispatcher() { auto machines_(machines.lock()); for (auto & m : *machines_) - machinesSorted.push_back({m, m->currentJobs}); + machinesSorted.push_back({m.second, m.second->state->currentJobs}); } /* Sort the machines by a combination of speed factor and @@ -929,7 +965,7 @@ void State::dispatcher() for (auto & mi : machinesSorted) { // FIXME: can we lose a wakeup if a builder exits concurrently? - if (mi.machine->currentJobs >= mi.machine->maxJobs) continue; + if (mi.machine->state->currentJobs >= mi.machine->maxJobs) continue; auto runnable_(runnable.lock()); //printMsg(lvlDebug, format("%1% runnable builds") % runnable_->size()); @@ -966,10 +1002,10 @@ void State::dispatcher() /* Make a slot reservation and start a thread to do the build. */ - auto reservation = std::make_shared(mi.machine); + auto reservation = std::make_shared(mi.machine->state->currentJobs); i = runnable_->erase(i); - auto builderThread = std::thread(&State::builder, this, step, reservation); + auto builderThread = std::thread(&State::builder, this, step, mi.machine, reservation); builderThread.detach(); // FIXME? keepGoing = true; @@ -1003,7 +1039,7 @@ void State::wakeDispatcher() } -void State::builder(Step::ptr step, MachineReservation::ptr reservation) +void State::builder(Step::ptr step, Machine::ptr machine, std::shared_ptr reservation) { bool retry = true; @@ -1011,10 +1047,10 @@ void State::builder(Step::ptr step, MachineReservation::ptr reservation) try { auto store = openStore(); // FIXME: pool - retry = doBuildStep(store, step, reservation->machine); + retry = doBuildStep(store, step, machine); } catch (std::exception & e) { printMsg(lvlError, format("uncaught exception building ‘%1%’ on ‘%2%’: %3%") - % step->drvPath % reservation->machine->sshName % e.what()); + % step->drvPath % machine->sshName % e.what()); } /* Release the machine and wake up the dispatcher. */ @@ -1359,9 +1395,9 @@ bool State::doBuildStep(std::shared_ptr store, Step::ptr step, nrStepsDone++; totalStepTime += stepStopTime - stepStartTime; totalStepBuildTime += result.stopTime - result.startTime; - machine->nrStepsDone++; - machine->totalStepTime += stepStopTime - stepStartTime; - machine->totalStepBuildTime += result.stopTime - result.startTime; + machine->state->nrStepsDone++; + machine->state->totalStepTime += stepStopTime - stepStartTime; + machine->state->totalStepBuildTime += result.stopTime - result.startTime; return false; } @@ -1564,16 +1600,18 @@ void State::dumpStatus(Connection & conn, bool log) root.attr("machines"); JSONObject nested(out); auto machines_(machines.lock()); - for (auto & m : *machines_) { + for (auto & i : *machines_) { + auto & m(i.second); + auto & s(m->state); nested.attr(m->sshName); JSONObject nested2(out); - nested2.attr("currentJobs", m->currentJobs); - nested2.attr("nrStepsDone", m->nrStepsDone); - if (m->nrStepsDone) { - nested2.attr("totalStepTime", m->totalStepTime); - nested2.attr("totalStepBuildTime", m->totalStepBuildTime); - nested2.attr("avgStepTime"); out << (float) m->totalStepTime / m->nrStepsDone; - nested2.attr("avgStepBuildTime"); out << (float) m->totalStepBuildTime / m->nrStepsDone; + nested2.attr("currentJobs", s->currentJobs); + nested2.attr("nrStepsDone", s->nrStepsDone); + if (m->state->nrStepsDone) { + nested2.attr("totalStepTime", s->totalStepTime); + nested2.attr("totalStepBuildTime", s->totalStepBuildTime); + nested2.attr("avgStepTime"); out << (float) s->totalStepTime / s->nrStepsDone; + nested2.attr("avgStepBuildTime"); out << (float) s->totalStepBuildTime / s->nrStepsDone; } } } @@ -1670,7 +1708,9 @@ void State::run() dumpStatus(*conn, false); } - loadMachines(); + loadMachinesFile(); + + std::thread(&State::monitorMachinesFile, this).detach(); std::thread(&State::queueMonitor, this).detach();