Turn hydra-notify into a daemon

It now receives notifications about started/finished builds/steps via
PostgreSQL. This gets rid of the (substantial) overhead of starting
hydra-notify for every event. It also allows other programs (even on
other machines) to listen to Hydra notifications.
This commit is contained in:
Eelco Dolstra 2019-08-09 19:11:38 +02:00
parent f13a2cb6dc
commit 2946899504
No known key found for this signature in database
GPG key ID: 8170B4726D7198DE
5 changed files with 105 additions and 174 deletions

View file

@ -99,6 +99,8 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
unsigned int maxSilentTime, buildTimeout; unsigned int maxSilentTime, buildTimeout;
unsigned int repeats = step->isDeterministic ? 1 : 0; unsigned int repeats = step->isDeterministic ? 1 : 0;
auto conn(dbPool.get());
{ {
std::set<Build::ptr> dependents; std::set<Build::ptr> dependents;
std::set<Step::ptr> steps; std::set<Step::ptr> steps;
@ -122,8 +124,10 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
for (auto build2 : dependents) { for (auto build2 : dependents) {
if (build2->drvPath == step->drvPath) { if (build2->drvPath == step->drvPath) {
build = build2; build = build2;
enqueueNotificationItem({NotificationItem::Type::BuildStarted, build->id}); pqxx::work txn(*conn);
notifyBuildStarted(txn, build->id);
txn.commit();
} }
{ {
auto i = jobsetRepeats.find(std::make_pair(build2->projectName, build2->jobsetName)); auto i = jobsetRepeats.find(std::make_pair(build2->projectName, build2->jobsetName));
@ -144,8 +148,6 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
bool quit = buildId == buildOne && step->drvPath == buildDrvPath; bool quit = buildId == buildOne && step->drvPath == buildDrvPath;
auto conn(dbPool.get());
RemoteResult result; RemoteResult result;
BuildOutput res; BuildOutput res;
unsigned int stepNr = 0; unsigned int stepNr = 0;
@ -170,11 +172,6 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
} catch (...) { } catch (...) {
ignoreException(); ignoreException();
} }
/* Asynchronously run plugins. FIXME: if we're killed,
plugin actions might not be run. Need to ensure
at-least-once semantics. */
enqueueNotificationItem({NotificationItem::Type::StepFinished, buildId, {}, stepNr, result.logFile});
} }
}); });
@ -342,8 +339,12 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
/* Send notification about the builds that have this step as /* Send notification about the builds that have this step as
the top-level. */ the top-level. */
for (auto id : buildIDs) {
enqueueNotificationItem({NotificationItem::Type::BuildFinished, id}); pqxx::work txn(*conn);
for (auto id : buildIDs)
notifyBuildFinished(txn, id, {});
txn.commit();
}
/* Wake up any dependent steps that have no other /* Wake up any dependent steps that have no other
dependencies. */ dependencies. */
@ -462,11 +463,10 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
/* Send notification about this build and its dependents. */ /* Send notification about this build and its dependents. */
{ {
auto notificationSenderQueue_(notificationSenderQueue.lock()); pqxx::work txn(*conn);
notificationSenderQueue_->push(NotificationItem{NotificationItem::Type::BuildFinished, buildId, dependentIDs}); notifyBuildFinished(txn, buildId, dependentIDs);
txn.commit();
} }
notificationSenderWakeup.notify_one();
} }
// FIXME: keep stats about aborted steps? // FIXME: keep stats about aborted steps?

View file

@ -299,6 +299,9 @@ void State::finishBuildStep(pqxx::work & txn, const RemoteResult & result,
(result.timesBuilt, result.timesBuilt > 0) (result.timesBuilt, result.timesBuilt > 0)
(result.isNonDeterministic, result.timesBuilt > 1) (result.isNonDeterministic, result.timesBuilt > 1)
.exec(); .exec();
assert(result.logFile.find('\'') == std::string::npos);
txn.exec(fmt("notify step_finished, '%d %d %s'", buildId, stepNr,
result.logFile.empty() ? "-" : result.logFile));
} }
@ -450,74 +453,20 @@ bool State::checkCachedFailure(Step::ptr step, Connection & conn)
} }
void State::notificationSender() void State::notifyBuildStarted(pqxx::work & txn, BuildID buildId)
{ {
while (true) { txn.exec(fmt("notify build_started, '%s'", buildId));
try { }
NotificationItem item;
{
auto notificationSenderQueue_(notificationSenderQueue.lock());
while (notificationSenderQueue_->empty())
notificationSenderQueue_.wait(notificationSenderWakeup);
item = notificationSenderQueue_->front();
notificationSenderQueue_->pop();
}
MaintainCount<counter> mc(nrNotificationsInProgress); void State::notifyBuildFinished(pqxx::work & txn, BuildID buildId,
const std::vector<BuildID> & dependentIds)
printMsg(lvlChatty, format("sending notification about build %1%") % item.id); {
auto payload = fmt("%d ", buildId);
auto now1 = std::chrono::steady_clock::now(); for (auto & d : dependentIds)
payload += fmt("%d ", d);
Pid pid = startProcess([&]() { // FIXME: apparently parameterized() doesn't support NOTIFY.
Strings argv; txn.exec(fmt("notify build_finished, '%s'", payload));
switch (item.type) {
case NotificationItem::Type::BuildStarted:
argv = {"hydra-notify", "build-started", std::to_string(item.id)};
for (auto id : item.dependentIds)
argv.push_back(std::to_string(id));
break;
case NotificationItem::Type::BuildFinished:
argv = {"hydra-notify", "build-finished", std::to_string(item.id)};
for (auto id : item.dependentIds)
argv.push_back(std::to_string(id));
break;
case NotificationItem::Type::StepFinished:
argv = {"hydra-notify", "step-finished", std::to_string(item.id), std::to_string(item.stepNr), item.logPath};
break;
};
printMsg(lvlChatty, "Executing hydra-notify " + concatStringsSep(" ", argv));
execvp("hydra-notify", (char * *) stringsToCharPtrs(argv).data()); // FIXME: remove cast
throw SysError("cannot start hydra-notify");
});
int res = pid.wait();
if (!statusOk(res))
throw Error("notification about build %d failed: %s", item.id, statusToString(res));
auto now2 = std::chrono::steady_clock::now();
if (item.type == NotificationItem::Type::BuildFinished) {
auto conn(dbPool.get());
pqxx::work txn(*conn);
txn.parameterized
("update Builds set notificationPendingSince = null where id = $1")
(item.id)
.exec();
txn.commit();
}
nrNotificationTimeMs += std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
nrNotificationsDone++;
} catch (std::exception & e) {
nrNotificationsFailed++;
printMsg(lvlError, format("notification sender: %1%") % e.what());
sleep(5);
}
}
} }
@ -589,13 +538,6 @@ void State::dumpStatus(Connection & conn, bool log)
root.attr("nrDbConnections", dbPool.count()); root.attr("nrDbConnections", dbPool.count());
root.attr("nrActiveDbUpdates", nrActiveDbUpdates); root.attr("nrActiveDbUpdates", nrActiveDbUpdates);
root.attr("memoryTokensInUse", memoryTokens.currentUse()); root.attr("memoryTokensInUse", memoryTokens.currentUse());
root.attr("nrNotificationsDone", nrNotificationsDone);
root.attr("nrNotificationsFailed", nrNotificationsFailed);
root.attr("nrNotificationsInProgress", nrNotificationsInProgress);
root.attr("nrNotificationsPending", notificationSenderQueue.lock()->size());
root.attr("nrNotificationTimeMs", nrNotificationTimeMs);
uint64_t nrNotificationsTotal = nrNotificationsDone + nrNotificationsFailed;
root.attr("nrNotificationTimeAvgMs", nrNotificationsTotal == 0 ? 0.0 : (float) nrNotificationTimeMs / nrNotificationsTotal);
{ {
auto nested = root.object("machines"); auto nested = root.object("machines");
@ -843,24 +785,6 @@ void State::run(BuildID buildOne)
std::thread(&State::dispatcher, this).detach(); std::thread(&State::dispatcher, this).detach();
/* Idem for notification sending. */
auto maxConcurrentNotifications = config->getIntOption("max-concurrent-notifications", 2);
for (uint64_t i = 0; i < maxConcurrentNotifications; ++i)
std::thread(&State::notificationSender, this).detach();
/* Enqueue notification items for builds that were finished
previously, but for which we didn't manage to send
notifications. */
{
auto conn(dbPool.get());
pqxx::work txn(*conn);
auto res = txn.parameterized("select id from Builds where notificationPendingSince > 0").exec();
for (auto const & row : res) {
auto id = row["id"].as<BuildID>();
enqueueNotificationItem({NotificationItem::Type::BuildFinished, id});
}
}
/* Periodically clean up orphaned busy steps in the database. */ /* Periodically clean up orphaned busy steps in the database. */
std::thread([&]() { std::thread([&]() {
while (true) { while (true) {

View file

@ -193,13 +193,12 @@ bool State::getQueuedBuilds(Connection & conn,
(build->id) (build->id)
((int) (ex.step->drvPath == build->drvPath ? bsFailed : bsDepFailed)) ((int) (ex.step->drvPath == build->drvPath ? bsFailed : bsDepFailed))
(time(0)).exec(); (time(0)).exec();
notifyBuildFinished(txn, build->id, {});
txn.commit(); txn.commit();
build->finishedInDB = true; build->finishedInDB = true;
nrBuildsDone++; nrBuildsDone++;
} }
enqueueNotificationItem({NotificationItem::Type::BuildFinished, build->id});
return; return;
} }
@ -230,13 +229,12 @@ bool State::getQueuedBuilds(Connection & conn,
time_t now = time(0); time_t now = time(0);
printMsg(lvlInfo, format("marking build %1% as succeeded (cached)") % build->id); printMsg(lvlInfo, format("marking build %1% as succeeded (cached)") % build->id);
markSucceededBuild(txn, build, res, true, now, now); markSucceededBuild(txn, build, res, true, now, now);
notifyBuildFinished(txn, build->id, {});
txn.commit(); txn.commit();
} }
build->finishedInDB = true; build->finishedInDB = true;
enqueueNotificationItem({NotificationItem::Type::BuildFinished, build->id});
return; return;
} }

View file

@ -347,39 +347,6 @@ private:
counter bytesSent{0}; counter bytesSent{0};
counter bytesReceived{0}; counter bytesReceived{0};
counter nrActiveDbUpdates{0}; counter nrActiveDbUpdates{0};
counter nrNotificationsDone{0};
counter nrNotificationsFailed{0};
counter nrNotificationsInProgress{0};
counter nrNotificationTimeMs{0};
/* Notification sender work queue. FIXME: if hydra-queue-runner is
killed before it has finished sending notifications about a
build, then the notifications may be lost. It would be better
to mark builds with pending notification in the database. */
struct NotificationItem
{
enum class Type : char {
BuildStarted,
BuildFinished,
StepFinished,
};
Type type;
BuildID id;
std::vector<BuildID> dependentIds;
unsigned int stepNr;
nix::Path logPath;
};
nix::Sync<std::queue<NotificationItem>> notificationSenderQueue;
std::condition_variable notificationSenderWakeup;
void enqueueNotificationItem(const NotificationItem && item)
{
{
auto notificationSenderQueue_(notificationSenderQueue.lock());
notificationSenderQueue_->emplace(item);
}
notificationSenderWakeup.notify_one();
}
/* Specific build to do for --build-one (testing only). */ /* Specific build to do for --build-one (testing only). */
BuildID buildOne; BuildID buildOne;
@ -540,9 +507,10 @@ private:
bool checkCachedFailure(Step::ptr step, Connection & conn); bool checkCachedFailure(Step::ptr step, Connection & conn);
/* Thread that asynchronously invokes hydra-notify to send build void notifyBuildStarted(pqxx::work & txn, BuildID buildId);
notifications. */
void notificationSender(); void notifyBuildFinished(pqxx::work & txn, BuildID buildId,
const std::vector<BuildID> & dependentIds);
/* Acquire the global queue runner lock, or null if somebody else /* Acquire the global queue runner lock, or null if somebody else
has it. */ has it. */

View file

@ -5,6 +5,7 @@ use utf8;
use Hydra::Plugin; use Hydra::Plugin;
use Hydra::Helper::Nix; use Hydra::Helper::Nix;
use Hydra::Helper::AddBuilds; use Hydra::Helper::AddBuilds;
use IO::Select;
STDERR->autoflush(1); STDERR->autoflush(1);
binmode STDERR, ":encoding(utf8)"; binmode STDERR, ":encoding(utf8)";
@ -15,20 +16,37 @@ my $db = Hydra::Model::DB->new();
my @plugins = Hydra::Plugin->instantiate(db => $db, config => $config); my @plugins = Hydra::Plugin->instantiate(db => $db, config => $config);
my $cmd = shift @ARGV or die "Syntax: hydra-notify build-started BUILD | build-finished BUILD-ID [BUILD-IDs...] | step-finished BUILD-ID STEP-NR LOG-PATH\n"; my $dbh = $db->storage->dbh;
my $buildId = shift @ARGV or die; $dbh->do("listen build_started");
my $build = $db->resultset('Builds')->find($buildId) $dbh->do("listen build_finished");
or die "build $buildId does not exist\n"; $dbh->do("listen step_finished");
sub buildStarted {
my ($buildId) = @_;
my $build = $db->resultset('Builds')->find($buildId)
or die "build $buildId does not exist\n";
foreach my $plugin (@plugins) {
eval { $plugin->buildStarted($build); };
if ($@) {
print STDERR "$plugin->buildStarted: $@\n";
}
}
}
sub buildFinished {
my ($build, @deps) = @_;
if ($cmd eq "build-finished") {
my $project = $build->project; my $project = $build->project;
my $jobset = $build->jobset; my $jobset = $build->jobset;
if (length($project->declfile) && $jobset->name eq ".jobsets" && $build->iscurrent) { if (length($project->declfile) && $jobset->name eq ".jobsets" && $build->iscurrent) {
handleDeclarativeJobsetBuild($db, $project, $build); handleDeclarativeJobsetBuild($db, $project, $build);
} }
my @dependents; my @dependents;
foreach my $id (@ARGV) { foreach my $id (@deps) {
my $dep = $db->resultset('Builds')->find($id) my $dep = $db->resultset('Builds')->find($id)
or die "build $id does not exist\n"; or die "build $id does not exist\n";
push @dependents, $dep; push @dependents, $dep;
@ -40,33 +58,20 @@ if ($cmd eq "build-finished") {
print STDERR "$plugin->buildFinished: $@\n"; print STDERR "$plugin->buildFinished: $@\n";
} }
} }
$build->update({ notificationpendingsince => undef });
} }
elsif ($cmd eq "build-queued") { sub stepFinished {
foreach my $plugin (@plugins) { my ($buildId, $stepNr, $logPath) = @_;
eval { $plugin->buildQueued($build); };
if ($@) {
print STDERR "$plugin->buildQueued: $@\n";
}
}
}
elsif ($cmd eq "build-started") { my $build = $db->resultset('Builds')->find($buildId)
foreach my $plugin (@plugins) { or die "build $buildId does not exist\n";
eval { $plugin->buildStarted($build); };
if ($@) {
print STDERR "$plugin->buildStarted: $@\n";
}
}
}
elsif ($cmd eq "step-finished") {
die if scalar @ARGV < 2;
my $stepNr = shift @ARGV;
my $step = $build->buildsteps->find({stepnr => $stepNr}) my $step = $build->buildsteps->find({stepnr => $stepNr})
or die "step $stepNr does not exist\n"; or die "step $stepNr does not exist\n";
my $logPath = shift @ARGV;
$logPath = undef if $logPath eq ""; $logPath = undef if $logPath eq "-";
foreach my $plugin (@plugins) { foreach my $plugin (@plugins) {
eval { $plugin->stepFinished($step, $logPath); }; eval { $plugin->stepFinished($step, $logPath); };
@ -76,6 +81,42 @@ elsif ($cmd eq "step-finished") {
} }
} }
else { # Process builds that finished while hydra-notify wasn't running.
die "unknown action $cmd"; for my $build ($db->resultset('Builds')->search(
{ notificationpendingsince => { '!=', undef } }))
{
my $buildId = $build->id;
print STDERR "sending notifications for build ${\$buildId}...\n";
buildFinished($build);
}
# Process incoming notifications.
my $fd = $dbh->func("getfd");
my $sel = IO::Select->new($fd);
while (1) {
$sel->can_read;
my $notify = $dbh->func("pg_notifies");
next if !$notify;
my ($channelName, $pid, $payload) = @$notify;
#print STDERR "got '$channelName' from $pid: $payload\n";
my @payload = split / /, $payload;
eval {
if ($channelName eq "build_started") {
buildStarted(int($payload[0]));
} elsif ($channelName eq "build_finished") {
my $buildId = int($payload[0]);
my $build = $db->resultset('Builds')->find($buildId)
or die "build $buildId does not exist\n";
buildFinished($build, @payload[1..$#payload]);
} elsif ($channelName eq "step_finished") {
stepFinished(int($payload[0]), int($payload[1]));
}
};
if ($@) {
print STDERR "error processing message '$payload' on channel '$channelName': $@\n";
}
} }