forked from lix-project/hydra
hydra-queue-runner: Make build notification more reliable
Previously, when hydra-queue-runner was restarted, any pending "build finished" notifications were lost. Now hydra-queue-runner marks finished but unnotified builds in the database and uses that to run pending notifications at startup.
This commit is contained in:
parent
5ee74dd3a0
commit
7c976d2aec
|
@ -418,7 +418,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
|
||||||
if (build2->finishedInDB) continue;
|
if (build2->finishedInDB) continue;
|
||||||
printMsg(lvlError, format("marking build %1% as failed") % build2->id);
|
printMsg(lvlError, format("marking build %1% as failed") % build2->id);
|
||||||
txn.parameterized
|
txn.parameterized
|
||||||
("update Builds set finished = 1, buildStatus = $2, startTime = $3, stopTime = $4, isCachedBuild = $5 where id = $1 and finished = 0")
|
("update Builds set finished = 1, buildStatus = $2, startTime = $3, stopTime = $4, isCachedBuild = $5, notificationPendingSince = $4 where id = $1 and finished = 0")
|
||||||
(build2->id)
|
(build2->id)
|
||||||
((int) (build2->drvPath != step->drvPath && result.buildStatus() == bsFailed ? bsDepFailed : result.buildStatus()))
|
((int) (build2->drvPath != step->drvPath && result.buildStatus() == bsFailed ? bsDepFailed : result.buildStatus()))
|
||||||
(result.startTime)
|
(result.startTime)
|
||||||
|
|
|
@ -414,7 +414,7 @@ void State::markSucceededBuild(pqxx::work & txn, Build::ptr build,
|
||||||
if (txn.parameterized("select 1 from Builds where id = $1 and finished = 0")(build->id).exec().empty()) return;
|
if (txn.parameterized("select 1 from Builds where id = $1 and finished = 0")(build->id).exec().empty()) return;
|
||||||
|
|
||||||
txn.parameterized
|
txn.parameterized
|
||||||
("update Builds set finished = 1, buildStatus = $2, startTime = $3, stopTime = $4, size = $5, closureSize = $6, releaseName = $7, isCachedBuild = $8 where id = $1")
|
("update Builds set finished = 1, buildStatus = $2, startTime = $3, stopTime = $4, size = $5, closureSize = $6, releaseName = $7, isCachedBuild = $8, notificationPendingSince = $4 where id = $1")
|
||||||
(build->id)
|
(build->id)
|
||||||
((int) (res.failed ? bsFailedWithOutput : bsSuccess))
|
((int) (res.failed ? bsFailedWithOutput : bsSuccess))
|
||||||
(startTime)
|
(startTime)
|
||||||
|
@ -519,6 +519,16 @@ void State::notificationSender()
|
||||||
|
|
||||||
auto now2 = std::chrono::steady_clock::now();
|
auto now2 = std::chrono::steady_clock::now();
|
||||||
|
|
||||||
|
if (item.type == NotificationItem::Type::BuildFinished) {
|
||||||
|
auto conn(dbPool.get());
|
||||||
|
pqxx::work txn(*conn);
|
||||||
|
txn.parameterized
|
||||||
|
("update Builds set notificationPendingSince = null where id = $1")
|
||||||
|
(item.id)
|
||||||
|
.exec();
|
||||||
|
txn.commit();
|
||||||
|
}
|
||||||
|
|
||||||
nrNotificationTimeMs += std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
|
nrNotificationTimeMs += std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
|
||||||
nrNotificationsDone++;
|
nrNotificationsDone++;
|
||||||
|
|
||||||
|
@ -837,6 +847,19 @@ void State::run(BuildID buildOne)
|
||||||
for (uint64_t i = 0; i < maxConcurrentNotifications; ++i)
|
for (uint64_t i = 0; i < maxConcurrentNotifications; ++i)
|
||||||
std::thread(&State::notificationSender, this).detach();
|
std::thread(&State::notificationSender, this).detach();
|
||||||
|
|
||||||
|
/* Enqueue notification items for builds that were finished
|
||||||
|
previously, but for which we didn't manage to send
|
||||||
|
notifications. */
|
||||||
|
{
|
||||||
|
auto conn(dbPool.get());
|
||||||
|
pqxx::work txn(*conn);
|
||||||
|
auto res = txn.parameterized("select id from Builds where notificationPendingSince > 0").exec();
|
||||||
|
for (auto const & row : res) {
|
||||||
|
auto id = row["id"].as<BuildID>();
|
||||||
|
enqueueNotificationItem({NotificationItem::Type::BuildFinished, id});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Periodically clean up orphaned busy steps in the database. */
|
/* Periodically clean up orphaned busy steps in the database. */
|
||||||
std::thread([&]() {
|
std::thread([&]() {
|
||||||
while (true) {
|
while (true) {
|
||||||
|
|
|
@ -188,7 +188,8 @@ bool State::getQueuedBuilds(Connection & conn,
|
||||||
|
|
||||||
createBuildStep(txn, 0, build->id, ex.step, "", bsCachedFailure, "", propagatedFrom);
|
createBuildStep(txn, 0, build->id, ex.step, "", bsCachedFailure, "", propagatedFrom);
|
||||||
txn.parameterized
|
txn.parameterized
|
||||||
("update Builds set finished = 1, buildStatus = $2, startTime = $3, stopTime = $3, isCachedBuild = 1 where id = $1 and finished = 0")
|
("update Builds set finished = 1, buildStatus = $2, startTime = $3, stopTime = $3, isCachedBuild = 1, notificationPendingSince = $3 "
|
||||||
|
"where id = $1 and finished = 0")
|
||||||
(build->id)
|
(build->id)
|
||||||
((int) (ex.step->drvPath == build->drvPath ? bsFailed : bsDepFailed))
|
((int) (ex.step->drvPath == build->drvPath ? bsFailed : bsDepFailed))
|
||||||
(time(0)).exec();
|
(time(0)).exec();
|
||||||
|
|
|
@ -222,12 +222,14 @@ create table Builds (
|
||||||
|
|
||||||
keep integer not null default 0, -- true means never garbage-collect the build output
|
keep integer not null default 0, -- true means never garbage-collect the build output
|
||||||
|
|
||||||
|
notificationPendingSince integer,
|
||||||
|
|
||||||
check (finished = 0 or (stoptime is not null and stoptime != 0)),
|
check (finished = 0 or (stoptime is not null and stoptime != 0)),
|
||||||
check (finished = 0 or (starttime is not null and starttime != 0)),
|
check (finished = 0 or (starttime is not null and starttime != 0)),
|
||||||
|
|
||||||
foreign key (project) references Projects(name) on update cascade,
|
foreign key (project) references Projects(name) on update cascade,
|
||||||
foreign key (project, jobset) references Jobsets(project, name) on update cascade,
|
foreign key (project, jobset) references Jobsets(project, name) on update cascade,
|
||||||
foreign key (project, jobset, job) references Jobs(project, jobset, name) on update cascade
|
foreign key (project, jobset, job) references Jobs(project, jobset, name) on update cascade
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
||||||
|
@ -677,3 +679,5 @@ create index IndexBuildsOnKeep on Builds(keep) where keep = 1;
|
||||||
|
|
||||||
-- To get the most recent eval for a jobset.
|
-- To get the most recent eval for a jobset.
|
||||||
create index IndexJobsetEvalsOnJobsetId on JobsetEvals(project, jobset, id desc) where hasNewBuilds = 1;
|
create index IndexJobsetEvalsOnJobsetId on JobsetEvals(project, jobset, id desc) where hasNewBuilds = 1;
|
||||||
|
|
||||||
|
create index IndexBuildsOnNotificationPendingSince on Builds(notificationPendingSince) where notificationPendingSince is not null;
|
||||||
|
|
3
src/sql/upgrade-55.sql
Normal file
3
src/sql/upgrade-55.sql
Normal file
|
@ -0,0 +1,3 @@
|
||||||
|
alter table Builds add column notificationPendingSince integer;
|
||||||
|
|
||||||
|
create index IndexBuildsOnNotificationPendingSince on Builds(notificationPendingSince) where notificationPendingSince is not null;
|
Loading…
Reference in a new issue