diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index ca35d3fb..68d000cb 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -25,6 +25,7 @@ #include "derivations.hh" #include "shared.hh" #include "globals.hh" +#include "value-to-json.hh" using namespace nix; @@ -256,12 +257,10 @@ private: public: State(); - ~State(); - - void clearBusy(time_t stopTime); - private: + void clearBusy(Connection & conn, time_t stopTime); + void loadMachines(); int createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step, @@ -308,9 +307,13 @@ private: /* Thread that asynchronously bzips logs of finished steps. */ void logCompressor(); + void dumpStatus(Connection & conn); + public: - void dumpStatus(); + void showStatus(); + + void unlock(); void run(); }; @@ -325,17 +328,6 @@ State::State() } -State::~State() -{ - try { - printMsg(lvlInfo, "clearing active builds / build steps..."); - clearBusy(time(0)); - } catch (...) { - ignoreException(); - } -} - - void State::loadMachines() { Path machinesFile = getEnv("NIX_REMOTE_SYSTEMS", "/etc/nix/machines"); @@ -381,10 +373,9 @@ void State::loadMachines() } -void State::clearBusy(time_t stopTime) +void State::clearBusy(Connection & conn, time_t stopTime) { - auto conn(dbPool.get()); - pqxx::work txn(*conn); + pqxx::work txn(conn); txn.parameterized ("update BuildSteps set busy = 0, status = $1, stopTime = $2 where busy = 1") ((int) bssAborted) @@ -1405,47 +1396,132 @@ void State::logCompressor() } -void State::dumpStatus() +void State::dumpStatus(Connection & conn) { + std::ostringstream out; + { - auto builds_(builds.lock()); - printMsg(lvlError, format("%1% queued builds") % builds_->size()); - } - { - auto steps_(steps.lock()); - for (auto i = steps_->begin(); i != steps_->end(); ) - if (i->second.lock()) ++i; else i = steps_->erase(i); - printMsg(lvlError, format("%1% pending/active build steps") % steps_->size()); - } - { - auto runnable_(runnable.lock()); - for (auto i = runnable_->begin(); i != runnable_->end(); ) - if (i->lock()) ++i; else i = runnable_->erase(i); - printMsg(lvlError, format("%1% runnable build steps") % runnable_->size()); - } - printMsg(lvlError, format("%1% active build steps") % nrActiveSteps); - printMsg(lvlError, format("%1% build steps currently building") % nrStepsBuilding); - printMsg(lvlError, format("%1% builds read from queue") % nrBuildsRead); - printMsg(lvlError, format("%1% builds done") % nrBuildsDone); - printMsg(lvlError, format("%1% build steps done") % nrStepsDone); - printMsg(lvlError, format("%1% build step retries") % nrRetries); - printMsg(lvlError, format("%1% most retries for any build step") % maxNrRetries); - printMsg(lvlError, format("%1% queue wakeups") % nrQueueWakeups); - printMsg(lvlError, format("%1% dispatcher wakeups") % nrDispatcherWakeups); - printMsg(lvlError, format("%1% database connections") % dbPool.count()); - { - auto machines_(machines.lock()); - for (auto & m : *machines_) { - printMsg(lvlError, format("machine %1%: %2%/%3% active") - % m->sshName % m->currentJobs % m->maxJobs); + JSONObject root(out); + root.attr("status", "up"); + root.attr("time", time(0)); + root.attr("pid", getpid()); + { + auto builds_(builds.lock()); + root.attr("nrQueuedBuilds", builds_->size()); } + { + auto steps_(steps.lock()); + for (auto i = steps_->begin(); i != steps_->end(); ) + if (i->second.lock()) ++i; else i = steps_->erase(i); + root.attr("nrUnfinishedSteps", steps_->size()); + } + { + auto runnable_(runnable.lock()); + for (auto i = runnable_->begin(); i != runnable_->end(); ) + if (i->lock()) ++i; else i = runnable_->erase(i); + root.attr("nrRunnableSteps", runnable_->size()); + } + root.attr("nrActiveSteps", nrActiveSteps); + root.attr("nrStepsBuilding", nrStepsBuilding); + root.attr("nrBuildsRead", nrBuildsRead); + root.attr("nrBuildsDone", nrBuildsDone); + root.attr("nrStepsDone", nrStepsDone); + root.attr("nrRetries", nrRetries); + root.attr("maxNrRetries", maxNrRetries); + root.attr("nrQueueWakeups", nrQueueWakeups); + root.attr("nrDispatcherWakeups", nrDispatcherWakeups); + root.attr("nrDbConnections", dbPool.count()); + { + root.attr("machines"); + JSONObject nested(out); + auto machines_(machines.lock()); + for (auto & m : *machines_) { + nested.attr(m->sshName); + JSONObject nested2(out); + nested2.attr("currentJobs", m->currentJobs); + nested2.attr("maxJobs", m->maxJobs); + } + } + } + + { + pqxx::work txn(conn); + // FIXME: use PostgreSQL 9.5 upsert. + txn.exec("delete from SystemStatus where what = 'queue-runner'"); + txn.parameterized("insert into SystemStatus values ('queue-runner', $1)")(out.str()).exec(); + txn.exec("notify status_dumped"); + txn.commit(); + } +} + + +void State::showStatus() +{ + auto conn(dbPool.get()); + receiver statusDumped(*conn, "status_dumped"); + + string status; + bool barf = false; + + /* Get the last JSON status dump from the database. */ + { + pqxx::work txn(*conn); + auto res = txn.exec("select status from SystemStatus where what = 'queue-runner'"); + if (res.size()) status = res[0][0].as(); + } + + if (status != "") { + + /* If the status is not empty, then the queue runner is + running. Ask it to update the status dump. */ + { + pqxx::work txn(*conn); + txn.exec("notify dump_status"); + txn.commit(); + } + + /* Wait until it has done so. */ + barf = conn->await_notification(5, 0) == 0; + + /* Get the new status. */ + { + pqxx::work txn(*conn); + auto res = txn.exec("select status from SystemStatus where what = 'queue-runner'"); + if (res.size()) status = res[0][0].as(); + } + + } + + if (status == "") status = R"({"status":"down"})"; + + std::cout << status << "\n"; + + if (barf) + throw Error("queue runner did not respond; status information may be wrong"); +} + + +void State::unlock() +{ + auto conn(dbPool.get()); + + clearBusy(*conn, 0); + + { + pqxx::work txn(*conn); + txn.exec("delete from SystemStatus where what = 'queue-runner'"); + txn.commit(); } } void State::run() { - clearBusy(0); + { + auto conn(dbPool.get()); + clearBusy(*conn, 0); + dumpStatus(*conn); + } loadMachines(); @@ -1464,7 +1540,7 @@ void State::run() while (true) { conn->await_notification(); if (dumpStatus.get()) - State::dumpStatus(); + State::dumpStatus(*conn); } } catch (std::exception & e) { printMsg(lvlError, format("main thread: %1%") % e.what()); @@ -1487,10 +1563,13 @@ int main(int argc, char * * argv) signal(SIGHUP, SIG_DFL); bool unlock = false; + bool status = false; parseCmdLine(argc, argv, [&](Strings::iterator & arg, const Strings::iterator & end) { if (*arg == "--unlock") unlock = true; + else if (*arg == "--status") + status = true; else return false; return true; @@ -1503,8 +1582,10 @@ int main(int argc, char * * argv) /* FIXME: need some locking to prevent multiple instances of hydra-queue-runner. */ State state; - if (unlock) - state.clearBusy(0); + if (status) + state.showStatus(); + else if (unlock) + state.unlock(); else state.run(); }); diff --git a/src/sql/hydra.sql b/src/sql/hydra.sql index 10f2d614..2bbb04d4 100644 --- a/src/sql/hydra.sql +++ b/src/sql/hydra.sql @@ -531,6 +531,12 @@ create rule IdempotentInsert as on insert to FailedPaths #endif +create table SystemStatus ( + what text primary key not null, + status json not null +); + + -- Cache of the number of finished builds. create table NrBuilds ( what text primary key not null, diff --git a/src/sql/upgrade-34.sql b/src/sql/upgrade-34.sql new file mode 100644 index 00000000..8e93cbcc --- /dev/null +++ b/src/sql/upgrade-34.sql @@ -0,0 +1,4 @@ +create table SystemStatus ( + what text primary key not null, + status json not null +);