Provide a plugin hook for when build steps finish

Fixes #318.
This commit is contained in:
Eelco Dolstra 2016-05-27 14:32:48 +02:00
parent f70946efca
commit a55942603a
7 changed files with 105 additions and 79 deletions

View file

@ -88,11 +88,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore, Step::ptr step,
for (auto build2 : dependents) { for (auto build2 : dependents) {
if (build2->drvPath == step->drvPath) { if (build2->drvPath == step->drvPath) {
build = build2; build = build2;
{ enqueueNotificationItem({NotificationItem::Type::BuildStarted, build->id});
auto notificationSenderQueue_(notificationSenderQueue.lock());
notificationSenderQueue_->push(NotificationItem{NotificationItem::Type::Started, build->id});
}
notificationSenderWakeup.notify_one();
} }
} }
if (!build) build = *dependents.begin(); if (!build) build = *dependents.begin();
@ -107,7 +103,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore, Step::ptr step,
RemoteResult result; RemoteResult result;
BuildOutput res; BuildOutput res;
int stepNr = 0; unsigned int stepNr = 0;
bool stepFinished = false; bool stepFinished = false;
Finally clearStep([&]() { Finally clearStep([&]() {
@ -169,7 +165,8 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore, Step::ptr step,
if (result.logFile != "") { if (result.logFile != "") {
{ {
auto logCompressorQueue_(logCompressorQueue.lock()); auto logCompressorQueue_(logCompressorQueue.lock());
logCompressorQueue_->push(result.logFile); assert(stepNr);
logCompressorQueue_->push({build->id, stepNr, result.logFile});
} }
logCompressorWakeup.notify_one(); logCompressorWakeup.notify_one();
} }
@ -269,13 +266,8 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore, Step::ptr step,
/* Send notification about the builds that have this step as /* Send notification about the builds that have this step as
the top-level. */ the top-level. */
for (auto id : buildIDs) { for (auto id : buildIDs)
{ enqueueNotificationItem({NotificationItem::Type::BuildFinished, id});
auto notificationSenderQueue_(notificationSenderQueue.lock());
notificationSenderQueue_->push(NotificationItem{NotificationItem::Type::Finished, id});
}
notificationSenderWakeup.notify_one();
}
/* Wake up any dependent steps that have no other /* Wake up any dependent steps that have no other
dependencies. */ dependencies. */
@ -394,7 +386,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore, Step::ptr step,
/* Send notification about this build and its dependents. */ /* Send notification about this build and its dependents. */
{ {
auto notificationSenderQueue_(notificationSenderQueue.lock()); auto notificationSenderQueue_(notificationSenderQueue.lock());
notificationSenderQueue_->push(NotificationItem{NotificationItem::Type::Finished, build->id, dependentIDs}); notificationSenderQueue_->push(NotificationItem{NotificationItem::Type::BuildFinished, build->id, dependentIDs});
} }
notificationSenderWakeup.notify_one(); notificationSenderWakeup.notify_one();

View file

@ -213,7 +213,7 @@ void State::clearBusy(Connection & conn, time_t stopTime)
} }
int State::allocBuildStep(pqxx::work & txn, Build::ptr build) unsigned int State::allocBuildStep(pqxx::work & txn, Build::ptr build)
{ {
/* Acquire an exclusive lock on BuildSteps to ensure that we don't /* Acquire an exclusive lock on BuildSteps to ensure that we don't
race with other threads creating a step of the same build. */ race with other threads creating a step of the same build. */
@ -224,10 +224,10 @@ int State::allocBuildStep(pqxx::work & txn, Build::ptr build)
} }
int State::createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step, unsigned int State::createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step,
const std::string & machine, BuildStatus status, const std::string & errorMsg, BuildID propagatedFrom) const std::string & machine, BuildStatus status, const std::string & errorMsg, BuildID propagatedFrom)
{ {
int stepNr = allocBuildStep(txn, build); unsigned int stepNr = allocBuildStep(txn, build);
txn.parameterized txn.parameterized
("insert into BuildSteps (build, stepnr, type, drvPath, busy, startTime, system, status, propagatedFrom, errorMsg, stopTime, machine) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)") ("insert into BuildSteps (build, stepnr, type, drvPath, busy, startTime, system, status, propagatedFrom, errorMsg, stopTime, machine) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)")
@ -254,7 +254,7 @@ int State::createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build,
void State::finishBuildStep(pqxx::work & txn, time_t startTime, time_t stopTime, unsigned int overhead, void State::finishBuildStep(pqxx::work & txn, time_t startTime, time_t stopTime, unsigned int overhead,
BuildID buildId, int stepNr, const std::string & machine, BuildStatus status, BuildID buildId, unsigned int stepNr, const std::string & machine, BuildStatus status,
const std::string & errorMsg, BuildID propagatedFrom) const std::string & errorMsg, BuildID propagatedFrom)
{ {
assert(startTime); assert(startTime);
@ -420,20 +420,21 @@ void State::logCompressor()
while (true) { while (true) {
try { try {
Path logPath; CompressionItem item;
{ {
auto logCompressorQueue_(logCompressorQueue.lock()); auto logCompressorQueue_(logCompressorQueue.lock());
while (logCompressorQueue_->empty()) while (logCompressorQueue_->empty())
logCompressorQueue_.wait(logCompressorWakeup); logCompressorQueue_.wait(logCompressorWakeup);
logPath = logCompressorQueue_->front(); item = logCompressorQueue_->front();
logCompressorQueue_->pop(); logCompressorQueue_->pop();
} }
if (!pathExists(logPath)) continue; if (!pathExists(item.logPath)) continue;
printMsg(lvlChatty, format("compressing log file %1%") % logPath); printMsg(lvlChatty, format("compressing log file %1%") % item.logPath);
Path tmpPath = logPath + ".bz2.tmp"; Path dstPath = item.logPath + ".bz2";
Path tmpPath = dstPath + ".tmp";
AutoCloseFD fd = open(tmpPath.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0644); AutoCloseFD fd = open(tmpPath.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0644);
@ -442,7 +443,7 @@ void State::logCompressor()
Pid pid = startProcess([&]() { Pid pid = startProcess([&]() {
if (dup2(fd, STDOUT_FILENO) == -1) if (dup2(fd, STDOUT_FILENO) == -1)
throw SysError("cannot dup output pipe to stdout"); throw SysError("cannot dup output pipe to stdout");
execlp("bzip2", "bzip2", "-c", logPath.c_str(), nullptr); execlp("bzip2", "bzip2", "-c", item.logPath.c_str(), nullptr);
throw SysError("cannot start bzip2"); throw SysError("cannot start bzip2");
}); });
@ -450,13 +451,18 @@ void State::logCompressor()
if (res != 0) if (res != 0)
throw Error(format("bzip2 returned exit code %1% while compressing %2%") throw Error(format("bzip2 returned exit code %1% while compressing %2%")
% res % logPath); % res % item.logPath);
if (rename(tmpPath.c_str(), (logPath + ".bz2").c_str()) != 0) if (rename(tmpPath.c_str(), dstPath.c_str()) != 0)
throw SysError(format("renaming %1%") % tmpPath); throw SysError(format("renaming %1%") % tmpPath);
if (unlink(logPath.c_str()) != 0) if (unlink(item.logPath.c_str()) != 0)
throw SysError(format("unlinking %1%") % logPath); throw SysError(format("unlinking %1%") % item.logPath);
/* Run plugins. We do this after log compression to ensure
that the log file doesn't change while the plugins may
be accessing it. */
enqueueNotificationItem({NotificationItem::Type::StepFinished, item.id, {}, item.stepNr, dstPath});
} catch (std::exception & e) { } catch (std::exception & e) {
printMsg(lvlError, format("log compressor: %1%") % e.what()); printMsg(lvlError, format("log compressor: %1%") % e.what());
@ -483,9 +489,22 @@ void State::notificationSender()
printMsg(lvlChatty, format("sending notification about build %1%") % item.id); printMsg(lvlChatty, format("sending notification about build %1%") % item.id);
Pid pid = startProcess([&]() { Pid pid = startProcess([&]() {
Strings argv({"hydra-notify", item.type == NotificationItem::Type::Started ? "build-started" : "build-finished", std::to_string(item.id)}); Strings argv;
switch (item.type) {
case NotificationItem::Type::BuildStarted:
argv = {"hydra-notify", "build-started", std::to_string(item.id)};
for (auto id : item.dependentIds) for (auto id : item.dependentIds)
argv.push_back(std::to_string(id)); argv.push_back(std::to_string(id));
break;
case NotificationItem::Type::BuildFinished:
argv = {"hydra-notify", "build-finished", std::to_string(item.id)};
for (auto id : item.dependentIds)
argv.push_back(std::to_string(id));
break;
case NotificationItem::Type::StepFinished:
argv = {"hydra-notify", "step-finished", std::to_string(item.id), std::to_string(item.stepNr), item.logPath};
break;
};
execvp("hydra-notify", (char * *) stringsToCharPtrs(argv).data()); // FIXME: remove cast execvp("hydra-notify", (char * *) stringsToCharPtrs(argv).data()); // FIXME: remove cast
throw SysError("cannot start hydra-notify"); throw SysError("cannot start hydra-notify");
}); });

View file

@ -311,7 +311,13 @@ private:
counter nrActiveDbUpdates{0}; counter nrActiveDbUpdates{0};
/* Log compressor work queue. */ /* Log compressor work queue. */
nix::Sync<std::queue<nix::Path>> logCompressorQueue; struct CompressionItem
{
BuildID id;
unsigned int stepNr;
nix::Path logPath;
};
nix::Sync<std::queue<CompressionItem>> logCompressorQueue;
std::condition_variable logCompressorWakeup; std::condition_variable logCompressorWakeup;
/* Notification sender work queue. FIXME: if hydra-queue-runner is /* Notification sender work queue. FIXME: if hydra-queue-runner is
@ -321,16 +327,28 @@ private:
struct NotificationItem struct NotificationItem
{ {
enum class Type : char { enum class Type : char {
Started, BuildStarted,
Finished BuildFinished,
StepFinished,
}; };
Type type; Type type;
BuildID id; BuildID id;
std::vector<BuildID> dependentIds; std::vector<BuildID> dependentIds;
unsigned int stepNr;
nix::Path logPath;
}; };
nix::Sync<std::queue<NotificationItem>> notificationSenderQueue; nix::Sync<std::queue<NotificationItem>> notificationSenderQueue;
std::condition_variable notificationSenderWakeup; std::condition_variable notificationSenderWakeup;
void enqueueNotificationItem(const NotificationItem && item)
{
{
auto notificationSenderQueue_(notificationSenderQueue.lock());
notificationSenderQueue_->emplace(item);
}
notificationSenderWakeup.notify_one();
}
/* Specific build to do for --build-one (testing only). */ /* Specific build to do for --build-one (testing only). */
BuildID buildOne; BuildID buildOne;
@ -397,14 +415,14 @@ private:
/* Thread to reload /etc/nix/machines periodically. */ /* Thread to reload /etc/nix/machines periodically. */
void monitorMachinesFile(); void monitorMachinesFile();
int allocBuildStep(pqxx::work & txn, Build::ptr build); unsigned int allocBuildStep(pqxx::work & txn, Build::ptr build);
int createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step, unsigned int createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step,
const std::string & machine, BuildStatus status, const std::string & errorMsg = "", const std::string & machine, BuildStatus status, const std::string & errorMsg = "",
BuildID propagatedFrom = 0); BuildID propagatedFrom = 0);
void finishBuildStep(pqxx::work & txn, time_t startTime, time_t stopTime, void finishBuildStep(pqxx::work & txn, time_t startTime, time_t stopTime,
unsigned int overhead, BuildID buildId, int stepNr, unsigned int overhead, BuildID buildId, unsigned int stepNr,
const std::string & machine, BuildStatus status, const std::string & errorMsg = "", const std::string & machine, BuildStatus status, const std::string & errorMsg = "",
BuildID propagatedFrom = 0); BuildID propagatedFrom = 0);

View file

@ -16,7 +16,6 @@ use File::Path;
use File::Temp; use File::Temp;
use File::Spec; use File::Spec;
use File::Slurp; use File::Slurp;
use Hydra::Helper::PluginHooks;
use Hydra::Helper::CatalystUtils; use Hydra::Helper::CatalystUtils;
our @ISA = qw(Exporter); our @ISA = qw(Exporter);

View file

@ -1,35 +0,0 @@
package Hydra::Helper::PluginHooks;
use strict;
use Exporter;
our @ISA = qw(Exporter);
our @EXPORT = qw(
notifyBuildStarted
notifyBuildFinished);
sub notifyBuildStarted {
my ($plugins, $build) = @_;
foreach my $plugin (@{$plugins}) {
eval {
$plugin->buildStarted($build);
};
if ($@) {
print STDERR "$plugin->buildStarted: $@\n";
}
}
}
sub notifyBuildFinished {
my ($plugins, $build, $dependents) = @_;
foreach my $plugin (@{$plugins}) {
eval {
$plugin->buildFinished($build, $dependents);
};
if ($@) {
print STDERR "$plugin->buildFinished: $@\n";
}
}
}
1;

View file

@ -33,6 +33,12 @@ sub buildFinished {
my ($self, $build, $dependents) = @_; my ($self, $build, $dependents) = @_;
} }
# Called when step $step has finished. The build log is stored in the
# file $logPath (bzip2-compressed).
sub stepFinished {
my ($self, $step, $logPath) = @_;
}
# Called to determine the set of supported input types. The plugin # Called to determine the set of supported input types. The plugin
# should add these to the $inputTypes hashref, e.g. $inputTypes{'svn'} # should add these to the $inputTypes hashref, e.g. $inputTypes{'svn'}
# = 'Subversion checkout'. # = 'Subversion checkout'.

View file

@ -4,7 +4,6 @@ use strict;
use utf8; use utf8;
use Hydra::Plugin; use Hydra::Plugin;
use Hydra::Helper::Nix; use Hydra::Helper::Nix;
use Hydra::Helper::PluginHooks;
use Hydra::Helper::AddBuilds; use Hydra::Helper::AddBuilds;
STDERR->autoflush(1); STDERR->autoflush(1);
@ -16,11 +15,12 @@ my $db = Hydra::Model::DB->new();
my @plugins = Hydra::Plugin->instantiate(db => $db, config => $config); my @plugins = Hydra::Plugin->instantiate(db => $db, config => $config);
my $cmd = shift @ARGV or die "Syntax: hydra-notify CMD BUILD-ID [BUILD-IDs...]\n"; my $cmd = shift @ARGV or die "Syntax: hydra-notify build-started BUILD | build-finished BUILD-ID [BUILD-IDs...] | step-finished BUILD-ID STEP-NR LOG-PATH\n";
my $buildId = shift @ARGV or die; my $buildId = shift @ARGV or die;
my $build = $db->resultset('Builds')->find($buildId) my $build = $db->resultset('Builds')->find($buildId)
or die "build $buildId does not exist\n"; or die "build $buildId does not exist\n";
if ($cmd eq "build-finished") { if ($cmd eq "build-finished") {
my $project = $build->project; my $project = $build->project;
my $jobset = $build->jobset; my $jobset = $build->jobset;
@ -33,9 +33,36 @@ if ($cmd eq "build-finished") {
or die "build $id does not exist\n"; or die "build $id does not exist\n";
push @dependents, $dep; push @dependents, $dep;
} }
notifyBuildFinished(\@plugins, $build, [@dependents]);
} elsif ($cmd eq "build-started") { foreach my $plugin (@plugins) {
notifyBuildStarted(\@plugins, $build); eval { $plugin->buildFinished($build, [@dependents]); };
if ($@) {
print STDERR "$plugin->buildFinished: $@\n";
}
}
}
elsif ($cmd eq "build-started") {
foreach my $plugin (@plugins) {
eval { $plugin->buildStarted($build); };
if ($@) {
print STDERR "$plugin->buildStarted: $@\n";
}
}
}
elsif ($cmd eq "step-finished") {
my $stepNr = shift @ARGV or die;
my $step = $build->buildsteps->find({stepnr => $stepNr})
or die "step $stepNr does not exist\n";
my $logPath = shift @ARGV or die;
foreach my $plugin (@plugins) {
eval { $plugin->stepFinished($step, $logPath); };
if ($@) {
print STDERR "$plugin->stepFinished: $@\n";
}
}
} }
else { else {