Make the queue monitor more robust, and better debug output
This commit is contained in:
parent
147eb4fd15
commit
dd104f14ea
1 changed files with 39 additions and 29 deletions
|
@ -76,7 +76,7 @@ struct Build
|
||||||
|
|
||||||
~Build()
|
~Build()
|
||||||
{
|
{
|
||||||
printMsg(lvlError, format("destroying build %1%") % id);
|
printMsg(lvlDebug, format("destroying build %1%") % id);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -110,7 +110,7 @@ struct Step
|
||||||
|
|
||||||
~Step()
|
~Step()
|
||||||
{
|
{
|
||||||
printMsg(lvlError, format("destroying step %1%") % drvPath);
|
printMsg(lvlDebug, format("destroying step %1%") % drvPath);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -223,6 +223,8 @@ public:
|
||||||
|
|
||||||
void queueMonitor();
|
void queueMonitor();
|
||||||
|
|
||||||
|
void queueMonitorLoop();
|
||||||
|
|
||||||
void getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store, unsigned int & lastBuildId);
|
void getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store, unsigned int & lastBuildId);
|
||||||
|
|
||||||
void removeCancelledBuilds(Connection & conn);
|
void removeCancelledBuilds(Connection & conn);
|
||||||
|
@ -270,7 +272,7 @@ State::State()
|
||||||
State::~State()
|
State::~State()
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
printMsg(lvlError, "clearing active builds / build steps...");
|
printMsg(lvlInfo, "clearing active builds / build steps...");
|
||||||
clearBusy(time(0));
|
clearBusy(time(0));
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
ignoreException();
|
ignoreException();
|
||||||
|
@ -382,6 +384,19 @@ void State::finishBuildStep(pqxx::work & txn, time_t startTime, time_t stopTime,
|
||||||
|
|
||||||
|
|
||||||
void State::queueMonitor()
|
void State::queueMonitor()
|
||||||
|
{
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
queueMonitorLoop();
|
||||||
|
} catch (std::exception & e) {
|
||||||
|
printMsg(lvlError, format("queue monitor: %1%") % e.what());
|
||||||
|
sleep(10); // probably a DB problem, so don't retry right away
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void State::queueMonitorLoop()
|
||||||
{
|
{
|
||||||
auto conn(dbPool.get());
|
auto conn(dbPool.get());
|
||||||
|
|
||||||
|
@ -417,24 +432,22 @@ void State::queueMonitor()
|
||||||
conn->await_notification();
|
conn->await_notification();
|
||||||
|
|
||||||
if (buildsAdded.get())
|
if (buildsAdded.get())
|
||||||
printMsg(lvlError, "got notification: new builds added to the queue");
|
printMsg(lvlTalkative, "got notification: new builds added to the queue");
|
||||||
if (buildsRestarted.get()) {
|
if (buildsRestarted.get()) {
|
||||||
printMsg(lvlError, "got notification: builds restarted");
|
printMsg(lvlTalkative, "got notification: builds restarted");
|
||||||
lastBuildId = 0; // check all builds
|
lastBuildId = 0; // check all builds
|
||||||
}
|
}
|
||||||
if (buildsCancelled.get()) {
|
if (buildsCancelled.get()) {
|
||||||
printMsg(lvlError, "got notification: builds cancelled");
|
printMsg(lvlTalkative, "got notification: builds cancelled");
|
||||||
removeCancelledBuilds(*conn);
|
removeCancelledBuilds(*conn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
printMsg(lvlError, "queue monitor exits");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void State::getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store, unsigned int & lastBuildId)
|
void State::getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store, unsigned int & lastBuildId)
|
||||||
{
|
{
|
||||||
printMsg(lvlError, format("checking the queue for builds > %1%...") % lastBuildId);
|
printMsg(lvlInfo, format("checking the queue for builds > %1%...") % lastBuildId);
|
||||||
|
|
||||||
/* Grab the queued builds from the database, but don't process
|
/* Grab the queued builds from the database, but don't process
|
||||||
them yet (since we don't want a long-running transaction). */
|
them yet (since we don't want a long-running transaction). */
|
||||||
|
@ -467,7 +480,7 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store,
|
||||||
// FIXME: remove build from newBuilds to ensure quick destruction
|
// FIXME: remove build from newBuilds to ensure quick destruction
|
||||||
// FIXME: exception handling
|
// FIXME: exception handling
|
||||||
|
|
||||||
printMsg(lvlInfo, format("loading build %1% (%2%)") % build->id % build->fullJobName);
|
printMsg(lvlTalkative, format("loading build %1% (%2%)") % build->id % build->fullJobName);
|
||||||
|
|
||||||
if (!store->isValidPath(build->drvPath)) {
|
if (!store->isValidPath(build->drvPath)) {
|
||||||
/* Derivation has been GC'ed prematurely. */
|
/* Derivation has been GC'ed prematurely. */
|
||||||
|
@ -492,7 +505,7 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store,
|
||||||
Derivation drv = readDerivation(build->drvPath);
|
Derivation drv = readDerivation(build->drvPath);
|
||||||
BuildResult res = getBuildResult(store, drv);
|
BuildResult res = getBuildResult(store, drv);
|
||||||
|
|
||||||
printMsg(lvlInfo, format("cached build %1%") % build->id);
|
printMsg(lvlInfo, format("marking build %1% as cached successful") % build->id);
|
||||||
|
|
||||||
pqxx::work txn(conn);
|
pqxx::work txn(conn);
|
||||||
time_t now = time(0);
|
time_t now = time(0);
|
||||||
|
@ -524,7 +537,7 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store,
|
||||||
}
|
}
|
||||||
|
|
||||||
if (checkCachedFailure(r, conn)) {
|
if (checkCachedFailure(r, conn)) {
|
||||||
printMsg(lvlError, format("failing build %1% due to previous failure") % build->id);
|
printMsg(lvlError, format("marking build %1% as cached failure") % build->id);
|
||||||
buildStatus = step == r ? bsFailed : bsFailed;
|
buildStatus = step == r ? bsFailed : bsFailed;
|
||||||
buildStepStatus = bssFailed;
|
buildStepStatus = bssFailed;
|
||||||
}
|
}
|
||||||
|
@ -558,7 +571,7 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store,
|
||||||
build->toplevel = step;
|
build->toplevel = step;
|
||||||
}
|
}
|
||||||
|
|
||||||
printMsg(lvlInfo, format("added build %1% (top-level step %2%, %3% new steps, %4% new runnable steps)")
|
printMsg(lvlChatty, format("added build %1% (top-level step %2%, %3% new steps, %4% new runnable steps)")
|
||||||
% build->id % step->drvPath % newSteps.size() % newRunnable.size());
|
% build->id % step->drvPath % newSteps.size() % newRunnable.size());
|
||||||
|
|
||||||
/* Prior to this, the build is not visible to
|
/* Prior to this, the build is not visible to
|
||||||
|
@ -614,7 +627,7 @@ Step::ptr State::createStep(std::shared_ptr<StoreAPI> store, const Path & drvPat
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
printMsg(lvlInfo, format("considering derivation ‘%1%’") % drvPath);
|
printMsg(lvlDebug, format("considering derivation ‘%1%’") % drvPath);
|
||||||
|
|
||||||
auto step = std::make_shared<Step>();
|
auto step = std::make_shared<Step>();
|
||||||
step->drvPath = drvPath;
|
step->drvPath = drvPath;
|
||||||
|
@ -639,7 +652,7 @@ Step::ptr State::createStep(std::shared_ptr<StoreAPI> store, const Path & drvPat
|
||||||
if (valid) return 0;
|
if (valid) return 0;
|
||||||
|
|
||||||
/* No, we need to build. */
|
/* No, we need to build. */
|
||||||
printMsg(lvlInfo, format("creating build step ‘%1%’") % drvPath);
|
printMsg(lvlDebug, format("creating build step ‘%1%’") % drvPath);
|
||||||
|
|
||||||
/* Create steps for the dependencies. */
|
/* Create steps for the dependencies. */
|
||||||
bool hasDeps = false;
|
bool hasDeps = false;
|
||||||
|
@ -671,7 +684,7 @@ void State::destroyStep(Step::ptr step, bool proceed)
|
||||||
if (step->destroyed) return;
|
if (step->destroyed) return;
|
||||||
step->destroyed = true;
|
step->destroyed = true;
|
||||||
|
|
||||||
printMsg(lvlInfo, format("destroying build step ‘%1%’") % step->drvPath);
|
printMsg(lvlDebug, format("destroying build step ‘%1%’") % step->drvPath);
|
||||||
|
|
||||||
{
|
{
|
||||||
auto steps_(steps.lock());
|
auto steps_(steps.lock());
|
||||||
|
@ -756,7 +769,7 @@ std::set<Build::ptr> State::getDependentBuilds(Step::ptr step)
|
||||||
|
|
||||||
void State::makeRunnable(Step::ptr step)
|
void State::makeRunnable(Step::ptr step)
|
||||||
{
|
{
|
||||||
printMsg(lvlInfo, format("step ‘%1%’ is now runnable") % step->drvPath);
|
printMsg(lvlChatty, format("step ‘%1%’ is now runnable") % step->drvPath);
|
||||||
|
|
||||||
{
|
{
|
||||||
auto step_(step->state.lock());
|
auto step_(step->state.lock());
|
||||||
|
@ -775,11 +788,11 @@ void State::makeRunnable(Step::ptr step)
|
||||||
void State::dispatcher()
|
void State::dispatcher()
|
||||||
{
|
{
|
||||||
while (true) {
|
while (true) {
|
||||||
printMsg(lvlError, "dispatcher woken up");
|
printMsg(lvlDebug, "dispatcher woken up");
|
||||||
|
|
||||||
{
|
{
|
||||||
auto runnable_(runnable.lock());
|
auto runnable_(runnable.lock());
|
||||||
printMsg(lvlError, format("%1% runnable builds") % runnable_->size());
|
printMsg(lvlDebug, format("%1% runnable builds") % runnable_->size());
|
||||||
|
|
||||||
/* FIXME: we're holding the runnable lock too long
|
/* FIXME: we're holding the runnable lock too long
|
||||||
here. This could be more efficient. */
|
here. This could be more efficient. */
|
||||||
|
@ -795,13 +808,11 @@ void State::dispatcher()
|
||||||
|
|
||||||
auto reservation = findMachine(step);
|
auto reservation = findMachine(step);
|
||||||
if (!reservation) {
|
if (!reservation) {
|
||||||
printMsg(lvlError, format("cannot execute step ‘%1%’ right now") % step->drvPath);
|
printMsg(lvlDebug, format("cannot execute step ‘%1%’ right now") % step->drvPath);
|
||||||
++i;
|
++i;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
printMsg(lvlInfo, format("WOOHOO: starting step ‘%1%’ on machine ‘%2%’")
|
|
||||||
% step->drvPath % reservation->machine->sshName);
|
|
||||||
i = runnable_->erase(i);
|
i = runnable_->erase(i);
|
||||||
|
|
||||||
auto builderThread = std::thread(&State::builder, this, step, reservation);
|
auto builderThread = std::thread(&State::builder, this, step, reservation);
|
||||||
|
@ -863,8 +874,6 @@ void State::builder(Step::ptr step, MachineReservation::ptr reservation)
|
||||||
assert(reservation.unique());
|
assert(reservation.unique());
|
||||||
reservation = 0;
|
reservation = 0;
|
||||||
wakeDispatcher();
|
wakeDispatcher();
|
||||||
|
|
||||||
printMsg(lvlError, "builder exits");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -900,7 +909,8 @@ void State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,
|
||||||
|
|
||||||
if (!build) build = *dependents.begin();
|
if (!build) build = *dependents.begin();
|
||||||
|
|
||||||
printMsg(lvlInfo, format("performing build step ‘%1%’ (needed by %2% builds)") % step->drvPath % dependents.size());
|
printMsg(lvlInfo, format("performing step ‘%1%’ on ‘%2%’ (needed by %3% builds)")
|
||||||
|
% step->drvPath % machine->sshName % dependents.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
auto conn(dbPool.get());
|
auto conn(dbPool.get());
|
||||||
|
@ -934,7 +944,7 @@ void State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,
|
||||||
result.status = RemoteResult::rrMiscFailure;
|
result.status = RemoteResult::rrMiscFailure;
|
||||||
result.errorMsg = e.msg();
|
result.errorMsg = e.msg();
|
||||||
printMsg(lvlError, format("ERROR: %1%") % e.msg());
|
printMsg(lvlError, format("ERROR: %1%") % e.msg());
|
||||||
abort();
|
abort(); // FIXME
|
||||||
}
|
}
|
||||||
|
|
||||||
if (result.status == RemoteResult::rrSuccess) res = getBuildResult(store, step->drv);
|
if (result.status == RemoteResult::rrSuccess) res = getBuildResult(store, step->drv);
|
||||||
|
@ -1042,7 +1052,7 @@ void State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,
|
||||||
void State::markSucceededBuild(pqxx::work & txn, Build::ptr build,
|
void State::markSucceededBuild(pqxx::work & txn, Build::ptr build,
|
||||||
const BuildResult & res, bool isCachedBuild, time_t startTime, time_t stopTime)
|
const BuildResult & res, bool isCachedBuild, time_t startTime, time_t stopTime)
|
||||||
{
|
{
|
||||||
printMsg(lvlError, format("marking build %1% as succeeded") % build->id);
|
printMsg(lvlInfo, format("marking build %1% as succeeded") % build->id);
|
||||||
|
|
||||||
txn.parameterized
|
txn.parameterized
|
||||||
("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $4, size = $5, closureSize = $6, releaseName = $7, isCachedBuild = $8 where id = $1")
|
("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $4, size = $5, closureSize = $6, releaseName = $7, isCachedBuild = $8 where id = $1")
|
||||||
|
@ -1097,8 +1107,8 @@ void State::run()
|
||||||
|
|
||||||
queueMonitorThread.join();
|
queueMonitorThread.join();
|
||||||
|
|
||||||
printMsg(lvlError, "exiting...");
|
//printMsg(lvlInfo, "exiting...");
|
||||||
printMsg(lvlError, format("psql connections = %1%") % dbPool.count());
|
//printMsg(lvlInfo, format("psql connections = %1%") % dbPool.count());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue