Warn if PostgreSQL appears stalled

This commit is contained in:
Eelco Dolstra 2016-02-29 15:10:30 +01:00
parent 922dc541c2
commit 7cd08c7c46
5 changed files with 32 additions and 0 deletions

View file

@ -112,6 +112,7 @@ bool State::doBuildStep(nix::ref<Store> destStore, Step::ptr step,
/* Create a build step record indicating that we started /* Create a build step record indicating that we started
building. */ building. */
{ {
auto mc = startDbUpdate();
pqxx::work txn(*conn); pqxx::work txn(*conn);
stepNr = createBuildStep(txn, result.startTime, build, step, machine->sshName, bssBusy); stepNr = createBuildStep(txn, result.startTime, build, step, machine->sshName, bssBusy);
txn.commit(); txn.commit();
@ -165,6 +166,7 @@ bool State::doBuildStep(nix::ref<Store> destStore, Step::ptr step,
retry = step_->tries + 1 < maxTries; retry = step_->tries + 1 < maxTries;
} }
if (retry) { if (retry) {
auto mc = startDbUpdate();
pqxx::work txn(*conn); pqxx::work txn(*conn);
finishBuildStep(txn, result.startTime, result.stopTime, result.overhead, build->id, finishBuildStep(txn, result.startTime, result.stopTime, result.overhead, build->id,
stepNr, machine->sshName, bssAborted, result.errorMsg); stepNr, machine->sshName, bssAborted, result.errorMsg);
@ -213,6 +215,8 @@ bool State::doBuildStep(nix::ref<Store> destStore, Step::ptr step,
/* Update the database. */ /* Update the database. */
{ {
auto mc = startDbUpdate();
pqxx::work txn(*conn); pqxx::work txn(*conn);
finishBuildStep(txn, result.startTime, result.stopTime, result.overhead, finishBuildStep(txn, result.startTime, result.stopTime, result.overhead,
@ -299,6 +303,8 @@ bool State::doBuildStep(nix::ref<Store> destStore, Step::ptr step,
/* Update the database. */ /* Update the database. */
{ {
auto mc = startDbUpdate();
pqxx::work txn(*conn); pqxx::work txn(*conn);
BuildStatus buildStatus = BuildStatus buildStatus =

View file

@ -1,6 +1,7 @@
#pragma once #pragma once
#include <atomic> #include <atomic>
#include <functional>
typedef std::atomic<unsigned long> counter; typedef std::atomic<unsigned long> counter;
@ -8,5 +9,9 @@ struct MaintainCount
{ {
counter & c; counter & c;
MaintainCount(counter & c) : c(c) { c++; } MaintainCount(counter & c) : c(c) { c++; }
MaintainCount(counter & c, std::function<void(unsigned long)> warn) : c(c)
{
warn(++c);
}
~MaintainCount() { auto prev = c--; assert(prev); } ~MaintainCount() { auto prev = c--; assert(prev); }
}; };

View file

@ -45,6 +45,16 @@ State::State()
} }
MaintainCount State::startDbUpdate()
{
return MaintainCount(nrActiveDbUpdates, [](unsigned long c) {
if (c > 6) {
printMsg(lvlError, format("warning: %d concurrent database updates; PostgreSQL may be stalled") % c);
}
});
}
ref<Store> State::getLocalStore() ref<Store> State::getLocalStore()
{ {
return ref<Store>(_localStore); return ref<Store>(_localStore);
@ -552,6 +562,7 @@ void State::dumpStatus(Connection & conn, bool log)
root.attr("nrQueueWakeups", nrQueueWakeups); root.attr("nrQueueWakeups", nrQueueWakeups);
root.attr("nrDispatcherWakeups", nrDispatcherWakeups); root.attr("nrDispatcherWakeups", nrDispatcherWakeups);
root.attr("nrDbConnections", dbPool.count()); root.attr("nrDbConnections", dbPool.count());
root.attr("nrActiveDbUpdates", nrActiveDbUpdates);
{ {
root.attr("machines"); root.attr("machines");
JSONObject nested(out); JSONObject nested(out);
@ -661,6 +672,7 @@ void State::dumpStatus(Connection & conn, bool log)
if (log) printMsg(lvlInfo, format("status: %1%") % out.str()); if (log) printMsg(lvlInfo, format("status: %1%") % out.str());
{ {
auto mc = startDbUpdate();
pqxx::work txn(conn); pqxx::work txn(conn);
// FIXME: use PostgreSQL 9.5 upsert. // FIXME: use PostgreSQL 9.5 upsert.
txn.exec("delete from SystemStatus where what = 'queue-runner'"); txn.exec("delete from SystemStatus where what = 'queue-runner'");

View file

@ -124,6 +124,7 @@ bool State::getQueuedBuilds(Connection & conn, ref<Store> localStore,
/* Derivation has been GC'ed prematurely. */ /* Derivation has been GC'ed prematurely. */
printMsg(lvlError, format("aborting GC'ed build %1%") % build->id); printMsg(lvlError, format("aborting GC'ed build %1%") % build->id);
if (!build->finishedInDB) { if (!build->finishedInDB) {
auto mc = startDbUpdate();
pqxx::work txn(conn); pqxx::work txn(conn);
txn.parameterized txn.parameterized
("update Builds set finished = 1, buildStatus = $2, startTime = $3, stopTime = $3, errorMsg = $4 where id = $1 and finished = 0") ("update Builds set finished = 1, buildStatus = $2, startTime = $3, stopTime = $3, errorMsg = $4 where id = $1 and finished = 0")
@ -161,10 +162,13 @@ bool State::getQueuedBuilds(Connection & conn, ref<Store> localStore,
Derivation drv = readDerivation(build->drvPath); Derivation drv = readDerivation(build->drvPath);
BuildOutput res = getBuildOutput(destStore, destStore->getFSAccessor(), drv); BuildOutput res = getBuildOutput(destStore, destStore->getFSAccessor(), drv);
{
auto mc = startDbUpdate();
pqxx::work txn(conn); pqxx::work txn(conn);
time_t now = time(0); time_t now = time(0);
markSucceededBuild(txn, build, res, true, now, now); markSucceededBuild(txn, build, res, true, now, now);
txn.commit(); txn.commit();
}
build->finishedInDB = true; build->finishedInDB = true;
@ -178,6 +182,7 @@ bool State::getQueuedBuilds(Connection & conn, ref<Store> localStore,
if (checkCachedFailure(r, conn)) { if (checkCachedFailure(r, conn)) {
printMsg(lvlError, format("marking build %1% as cached failure") % build->id); printMsg(lvlError, format("marking build %1% as cached failure") % build->id);
if (!build->finishedInDB) { if (!build->finishedInDB) {
auto mc = startDbUpdate();
pqxx::work txn(conn); pqxx::work txn(conn);
/* Find the previous build step record, first by /* Find the previous build step record, first by
@ -421,6 +426,7 @@ Step::ptr State::createStep(ref<Store> destStore,
time_t stopTime = time(0); time_t stopTime = time(0);
{ {
auto mc = startDbUpdate();
pqxx::work txn(conn); pqxx::work txn(conn);
createSubstitutionStep(txn, startTime, stopTime, build, drvPath, "out", i.second.path); createSubstitutionStep(txn, startTime, stopTime, build, drvPath, "out", i.second.path);
txn.commit(); txn.commit();

View file

@ -313,6 +313,7 @@ private:
counter nrDispatcherWakeups{0}; counter nrDispatcherWakeups{0};
counter bytesSent{0}; counter bytesSent{0};
counter bytesReceived{0}; counter bytesReceived{0};
counter nrActiveDbUpdates{0};
/* Log compressor work queue. */ /* Log compressor work queue. */
nix::Sync<std::queue<nix::Path>> logCompressorQueue; nix::Sync<std::queue<nix::Path>> logCompressorQueue;
@ -359,6 +360,8 @@ public:
private: private:
MaintainCount startDbUpdate();
/* Return a store object that can access derivations produced by /* Return a store object that can access derivations produced by
hydra-evaluator. */ hydra-evaluator. */
nix::ref<nix::Store> getLocalStore(); nix::ref<nix::Store> getLocalStore();