forked from lix-project/hydra
Add command ‘hydra-queue-runner --status’ to show current status
This commit is contained in:
parent
44a2b74f5a
commit
4f4141e1db
|
@ -25,6 +25,7 @@
|
|||
#include "derivations.hh"
|
||||
#include "shared.hh"
|
||||
#include "globals.hh"
|
||||
#include "value-to-json.hh"
|
||||
|
||||
using namespace nix;
|
||||
|
||||
|
@ -256,12 +257,10 @@ private:
|
|||
public:
|
||||
State();
|
||||
|
||||
~State();
|
||||
|
||||
void clearBusy(time_t stopTime);
|
||||
|
||||
private:
|
||||
|
||||
void clearBusy(Connection & conn, time_t stopTime);
|
||||
|
||||
void loadMachines();
|
||||
|
||||
int createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step,
|
||||
|
@ -308,9 +307,13 @@ private:
|
|||
/* Thread that asynchronously bzips logs of finished steps. */
|
||||
void logCompressor();
|
||||
|
||||
void dumpStatus(Connection & conn);
|
||||
|
||||
public:
|
||||
|
||||
void dumpStatus();
|
||||
void showStatus();
|
||||
|
||||
void unlock();
|
||||
|
||||
void run();
|
||||
};
|
||||
|
@ -325,17 +328,6 @@ State::State()
|
|||
}
|
||||
|
||||
|
||||
State::~State()
|
||||
{
|
||||
try {
|
||||
printMsg(lvlInfo, "clearing active builds / build steps...");
|
||||
clearBusy(time(0));
|
||||
} catch (...) {
|
||||
ignoreException();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void State::loadMachines()
|
||||
{
|
||||
Path machinesFile = getEnv("NIX_REMOTE_SYSTEMS", "/etc/nix/machines");
|
||||
|
@ -381,10 +373,9 @@ void State::loadMachines()
|
|||
}
|
||||
|
||||
|
||||
void State::clearBusy(time_t stopTime)
|
||||
void State::clearBusy(Connection & conn, time_t stopTime)
|
||||
{
|
||||
auto conn(dbPool.get());
|
||||
pqxx::work txn(*conn);
|
||||
pqxx::work txn(conn);
|
||||
txn.parameterized
|
||||
("update BuildSteps set busy = 0, status = $1, stopTime = $2 where busy = 1")
|
||||
((int) bssAborted)
|
||||
|
@ -1405,47 +1396,132 @@ void State::logCompressor()
|
|||
}
|
||||
|
||||
|
||||
void State::dumpStatus()
|
||||
void State::dumpStatus(Connection & conn)
|
||||
{
|
||||
std::ostringstream out;
|
||||
|
||||
{
|
||||
auto builds_(builds.lock());
|
||||
printMsg(lvlError, format("%1% queued builds") % builds_->size());
|
||||
}
|
||||
{
|
||||
auto steps_(steps.lock());
|
||||
for (auto i = steps_->begin(); i != steps_->end(); )
|
||||
if (i->second.lock()) ++i; else i = steps_->erase(i);
|
||||
printMsg(lvlError, format("%1% pending/active build steps") % steps_->size());
|
||||
}
|
||||
{
|
||||
auto runnable_(runnable.lock());
|
||||
for (auto i = runnable_->begin(); i != runnable_->end(); )
|
||||
if (i->lock()) ++i; else i = runnable_->erase(i);
|
||||
printMsg(lvlError, format("%1% runnable build steps") % runnable_->size());
|
||||
}
|
||||
printMsg(lvlError, format("%1% active build steps") % nrActiveSteps);
|
||||
printMsg(lvlError, format("%1% build steps currently building") % nrStepsBuilding);
|
||||
printMsg(lvlError, format("%1% builds read from queue") % nrBuildsRead);
|
||||
printMsg(lvlError, format("%1% builds done") % nrBuildsDone);
|
||||
printMsg(lvlError, format("%1% build steps done") % nrStepsDone);
|
||||
printMsg(lvlError, format("%1% build step retries") % nrRetries);
|
||||
printMsg(lvlError, format("%1% most retries for any build step") % maxNrRetries);
|
||||
printMsg(lvlError, format("%1% queue wakeups") % nrQueueWakeups);
|
||||
printMsg(lvlError, format("%1% dispatcher wakeups") % nrDispatcherWakeups);
|
||||
printMsg(lvlError, format("%1% database connections") % dbPool.count());
|
||||
{
|
||||
auto machines_(machines.lock());
|
||||
for (auto & m : *machines_) {
|
||||
printMsg(lvlError, format("machine %1%: %2%/%3% active")
|
||||
% m->sshName % m->currentJobs % m->maxJobs);
|
||||
JSONObject root(out);
|
||||
root.attr("status", "up");
|
||||
root.attr("time", time(0));
|
||||
root.attr("pid", getpid());
|
||||
{
|
||||
auto builds_(builds.lock());
|
||||
root.attr("nrQueuedBuilds", builds_->size());
|
||||
}
|
||||
{
|
||||
auto steps_(steps.lock());
|
||||
for (auto i = steps_->begin(); i != steps_->end(); )
|
||||
if (i->second.lock()) ++i; else i = steps_->erase(i);
|
||||
root.attr("nrUnfinishedSteps", steps_->size());
|
||||
}
|
||||
{
|
||||
auto runnable_(runnable.lock());
|
||||
for (auto i = runnable_->begin(); i != runnable_->end(); )
|
||||
if (i->lock()) ++i; else i = runnable_->erase(i);
|
||||
root.attr("nrRunnableSteps", runnable_->size());
|
||||
}
|
||||
root.attr("nrActiveSteps", nrActiveSteps);
|
||||
root.attr("nrStepsBuilding", nrStepsBuilding);
|
||||
root.attr("nrBuildsRead", nrBuildsRead);
|
||||
root.attr("nrBuildsDone", nrBuildsDone);
|
||||
root.attr("nrStepsDone", nrStepsDone);
|
||||
root.attr("nrRetries", nrRetries);
|
||||
root.attr("maxNrRetries", maxNrRetries);
|
||||
root.attr("nrQueueWakeups", nrQueueWakeups);
|
||||
root.attr("nrDispatcherWakeups", nrDispatcherWakeups);
|
||||
root.attr("nrDbConnections", dbPool.count());
|
||||
{
|
||||
root.attr("machines");
|
||||
JSONObject nested(out);
|
||||
auto machines_(machines.lock());
|
||||
for (auto & m : *machines_) {
|
||||
nested.attr(m->sshName);
|
||||
JSONObject nested2(out);
|
||||
nested2.attr("currentJobs", m->currentJobs);
|
||||
nested2.attr("maxJobs", m->maxJobs);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
pqxx::work txn(conn);
|
||||
// FIXME: use PostgreSQL 9.5 upsert.
|
||||
txn.exec("delete from SystemStatus where what = 'queue-runner'");
|
||||
txn.parameterized("insert into SystemStatus values ('queue-runner', $1)")(out.str()).exec();
|
||||
txn.exec("notify status_dumped");
|
||||
txn.commit();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void State::showStatus()
|
||||
{
|
||||
auto conn(dbPool.get());
|
||||
receiver statusDumped(*conn, "status_dumped");
|
||||
|
||||
string status;
|
||||
bool barf = false;
|
||||
|
||||
/* Get the last JSON status dump from the database. */
|
||||
{
|
||||
pqxx::work txn(*conn);
|
||||
auto res = txn.exec("select status from SystemStatus where what = 'queue-runner'");
|
||||
if (res.size()) status = res[0][0].as<string>();
|
||||
}
|
||||
|
||||
if (status != "") {
|
||||
|
||||
/* If the status is not empty, then the queue runner is
|
||||
running. Ask it to update the status dump. */
|
||||
{
|
||||
pqxx::work txn(*conn);
|
||||
txn.exec("notify dump_status");
|
||||
txn.commit();
|
||||
}
|
||||
|
||||
/* Wait until it has done so. */
|
||||
barf = conn->await_notification(5, 0) == 0;
|
||||
|
||||
/* Get the new status. */
|
||||
{
|
||||
pqxx::work txn(*conn);
|
||||
auto res = txn.exec("select status from SystemStatus where what = 'queue-runner'");
|
||||
if (res.size()) status = res[0][0].as<string>();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if (status == "") status = R"({"status":"down"})";
|
||||
|
||||
std::cout << status << "\n";
|
||||
|
||||
if (barf)
|
||||
throw Error("queue runner did not respond; status information may be wrong");
|
||||
}
|
||||
|
||||
|
||||
void State::unlock()
|
||||
{
|
||||
auto conn(dbPool.get());
|
||||
|
||||
clearBusy(*conn, 0);
|
||||
|
||||
{
|
||||
pqxx::work txn(*conn);
|
||||
txn.exec("delete from SystemStatus where what = 'queue-runner'");
|
||||
txn.commit();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void State::run()
|
||||
{
|
||||
clearBusy(0);
|
||||
{
|
||||
auto conn(dbPool.get());
|
||||
clearBusy(*conn, 0);
|
||||
dumpStatus(*conn);
|
||||
}
|
||||
|
||||
loadMachines();
|
||||
|
||||
|
@ -1464,7 +1540,7 @@ void State::run()
|
|||
while (true) {
|
||||
conn->await_notification();
|
||||
if (dumpStatus.get())
|
||||
State::dumpStatus();
|
||||
State::dumpStatus(*conn);
|
||||
}
|
||||
} catch (std::exception & e) {
|
||||
printMsg(lvlError, format("main thread: %1%") % e.what());
|
||||
|
@ -1487,10 +1563,13 @@ int main(int argc, char * * argv)
|
|||
signal(SIGHUP, SIG_DFL);
|
||||
|
||||
bool unlock = false;
|
||||
bool status = false;
|
||||
|
||||
parseCmdLine(argc, argv, [&](Strings::iterator & arg, const Strings::iterator & end) {
|
||||
if (*arg == "--unlock")
|
||||
unlock = true;
|
||||
else if (*arg == "--status")
|
||||
status = true;
|
||||
else
|
||||
return false;
|
||||
return true;
|
||||
|
@ -1503,8 +1582,10 @@ int main(int argc, char * * argv)
|
|||
/* FIXME: need some locking to prevent multiple instances of
|
||||
hydra-queue-runner. */
|
||||
State state;
|
||||
if (unlock)
|
||||
state.clearBusy(0);
|
||||
if (status)
|
||||
state.showStatus();
|
||||
else if (unlock)
|
||||
state.unlock();
|
||||
else
|
||||
state.run();
|
||||
});
|
||||
|
|
|
@ -531,6 +531,12 @@ create rule IdempotentInsert as on insert to FailedPaths
|
|||
#endif
|
||||
|
||||
|
||||
create table SystemStatus (
|
||||
what text primary key not null,
|
||||
status json not null
|
||||
);
|
||||
|
||||
|
||||
-- Cache of the number of finished builds.
|
||||
create table NrBuilds (
|
||||
what text primary key not null,
|
||||
|
|
4
src/sql/upgrade-34.sql
Normal file
4
src/sql/upgrade-34.sql
Normal file
|
@ -0,0 +1,4 @@
|
|||
create table SystemStatus (
|
||||
what text primary key not null,
|
||||
status json not null
|
||||
);
|
Loading…
Reference in a new issue