hydra-queue-runner: Send build notifications

Since our notification plugins are written in Perl, sending
notification from C++ requires a small Perl helper named
‘hydra-notify’.
This commit is contained in:
Eelco Dolstra 2015-06-23 00:14:49 +02:00
parent 5312e1209b
commit a317d24b29
4 changed files with 111 additions and 6 deletions

View file

@ -261,6 +261,14 @@ private:
Sync<std::queue<Path>> logCompressorQueue; Sync<std::queue<Path>> logCompressorQueue;
std::condition_variable_any logCompressorWakeup; std::condition_variable_any logCompressorWakeup;
/* Notification sender work queue. FIXME: if hydra-queue-runner is
killed before it has finished sending notifications about a
build, then the notifications may be lost. It would be better
to mark builds with pending notification in the database. */
typedef std::pair<BuildID, std::vector<BuildID>> NotificationItem;
Sync<std::queue<NotificationItem>> notificationSenderQueue;
std::condition_variable_any notificationSenderWakeup;
public: public:
State(); State();
@ -314,6 +322,10 @@ private:
/* Thread that asynchronously bzips logs of finished steps. */ /* Thread that asynchronously bzips logs of finished steps. */
void logCompressor(); void logCompressor();
/* Thread that asynchronously invokes hydra-notify to send build
notifications. */
void notificationSender();
/* Acquire the global queue runner lock, or null if somebody else /* Acquire the global queue runner lock, or null if somebody else
has it. */ has it. */
std::shared_ptr<PathLocks> acquireGlobalLock(); std::shared_ptr<PathLocks> acquireGlobalLock();
@ -1186,6 +1198,13 @@ bool State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,
} }
} }
/* Send notification about this build. */
{
auto notificationSenderQueue_(notificationSenderQueue.lock());
notificationSenderQueue_->push(NotificationItem(build->id, std::vector<BuildID>()));
}
notificationSenderWakeup.notify_one();
/* Wake up any dependent steps that have no other /* Wake up any dependent steps that have no other
dependencies. */ dependencies. */
{ {
@ -1213,6 +1232,8 @@ bool State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,
/* Register failure in the database for all Build objects that /* Register failure in the database for all Build objects that
directly or indirectly depend on this step. */ directly or indirectly depend on this step. */
std::vector<BuildID> dependentIDs;
while (true) { while (true) {
/* Get the builds and steps that depend on this step. */ /* Get the builds and steps that depend on this step. */
@ -1273,6 +1294,7 @@ bool State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,
for (auto & build2 : indirect) { for (auto & build2 : indirect) {
if (build2->finishedInDB) continue; if (build2->finishedInDB) continue;
printMsg(lvlError, format("marking build %1% as failed") % build2->id); printMsg(lvlError, format("marking build %1% as failed") % build2->id);
dependentIDs.push_back(build2->id);
txn.parameterized txn.parameterized
("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $4, isCachedBuild = $5 where id = $1 and finished = 0") ("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $4, isCachedBuild = $5 where id = $1 and finished = 0")
(build2->id) (build2->id)
@ -1301,6 +1323,13 @@ bool State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,
} }
} }
/* Send notification about this build and its dependents. */
{
auto notificationSenderQueue_(notificationSenderQueue.lock());
notificationSenderQueue_->push(NotificationItem(build->id, dependentIDs));
}
notificationSenderWakeup.notify_one();
} }
// FIXME: keep stats about aborted steps? // FIXME: keep stats about aborted steps?
@ -1391,7 +1420,7 @@ void State::logCompressor()
if (dup2(fd, STDOUT_FILENO) == -1) if (dup2(fd, STDOUT_FILENO) == -1)
throw SysError("cannot dup output pipe to stdout"); throw SysError("cannot dup output pipe to stdout");
execlp("bzip2", "bzip2", "-c", logPath.c_str(), nullptr); execlp("bzip2", "bzip2", "-c", logPath.c_str(), nullptr);
throw SysError("cannot start ssh"); throw SysError("cannot start bzip2");
}); });
int res = pid.wait(true); int res = pid.wait(true);
@ -1414,6 +1443,44 @@ void State::logCompressor()
} }
void State::notificationSender()
{
while (true) {
try {
NotificationItem item;
{
auto notificationSenderQueue_(notificationSenderQueue.lock());
while (notificationSenderQueue_->empty())
notificationSenderQueue_.wait(notificationSenderWakeup);
item = notificationSenderQueue_->front();
notificationSenderQueue_->pop();
}
printMsg(lvlChatty, format("sending notification about build %1%") % item.first);
Pid pid = startProcess([&]() {
Strings argv({"hydra-notify", "build", int2String(item.first)});
for (auto id : item.second)
argv.push_back(int2String(id));
execvp("hydra-notify", (char * *) stringsToCharPtrs(argv).data()); // FIXME: remove cast
throw SysError("cannot start hydra-notify");
});
int res = pid.wait(true);
if (res != 0)
throw Error(format("hydra-build returned exit code %1% notifying about build %2%")
% res % item.first);
} catch (std::exception & e) {
printMsg(lvlError, format("notification sender: %1%") % e.what());
sleep(5);
}
}
}
std::shared_ptr<PathLocks> State::acquireGlobalLock() std::shared_ptr<PathLocks> State::acquireGlobalLock()
{ {
Path lockPath = hydraData + "/queue-runner"; Path lockPath = hydraData + "/queue-runner";
@ -1580,7 +1647,7 @@ void State::run()
loadMachines(); loadMachines();
auto queueMonitorThread = std::thread(&State::queueMonitor, this); std::thread(&State::queueMonitor, this).detach();
std::thread(&State::dispatcher, this).detach(); std::thread(&State::dispatcher, this).detach();
@ -1588,6 +1655,11 @@ void State::run()
than one. */ than one. */
std::thread(&State::logCompressor, this).detach(); std::thread(&State::logCompressor, this).detach();
/* Idem for notification sending. */
std::thread(&State::notificationSender, this).detach();
/* Monitor the database for status dump requests (e.g. from
hydra-queue-runner --status). */
while (true) { while (true) {
try { try {
auto conn(dbPool.get()); auto conn(dbPool.get());
@ -1601,9 +1673,6 @@ void State::run()
sleep(10); // probably a DB problem, so don't retry right away sleep(10); // probably a DB problem, so don't retry right away
} }
} }
// Never reached.
queueMonitorThread.join();
} }

View file

@ -9,6 +9,7 @@ distributable_scripts = \
hydra-update-gc-roots \ hydra-update-gc-roots \
hydra-s3-backup-collect-garbage \ hydra-s3-backup-collect-garbage \
hydra-create-user \ hydra-create-user \
hydra-notify \
nix-prefetch-git \ nix-prefetch-git \
nix-prefetch-bzr \ nix-prefetch-bzr \
nix-prefetch-hg nix-prefetch-hg

View file

@ -1,4 +1,4 @@
#! /var/run/current-system/sw/bin/perl #! /run/current-system/sw/bin/perl
use strict; use strict;
use utf8; use utf8;

35
src/script/hydra-notify Executable file
View file

@ -0,0 +1,35 @@
#! /run/current-system/sw/bin/perl
use strict;
use utf8;
use Hydra::Plugin;
use Hydra::Helper::Nix;
use Hydra::Helper::PluginHooks;
STDERR->autoflush(1);
binmode STDERR, ":encoding(utf8)";
my $config = getHydraConfig();
my $db = Hydra::Model::DB->new();
my @plugins = Hydra::Plugin->instantiate(db => $db, config => $config);
my $cmd = shift @ARGV or die "Syntax: hydra-notify build BUILD-ID [BUILD-IDs...]\n";
if ($cmd eq "build") {
my $buildId = shift @ARGV or die;
my $build = $db->resultset('Builds')->find($buildId)
or die "build $buildId does not exist\n";
my @dependents;
foreach my $id (@ARGV) {
my $dep = $db->resultset('Builds')->find($id)
or die "build $id does not exist\n";
push @dependents, $dep;
}
notifyBuildFinished(\@plugins, $build, [@dependents]);
}
else {
die "unknown action $cmd";
}