diff --git a/src/hydra-queue-runner/builder.cc b/src/hydra-queue-runner/builder.cc index d670bd2d..b0e11151 100644 --- a/src/hydra-queue-runner/builder.cc +++ b/src/hydra-queue-runner/builder.cc @@ -88,11 +88,7 @@ State::StepResult State::doBuildStep(nix::ref destStore, Step::ptr step, for (auto build2 : dependents) { if (build2->drvPath == step->drvPath) { build = build2; - { - auto notificationSenderQueue_(notificationSenderQueue.lock()); - notificationSenderQueue_->push(NotificationItem{NotificationItem::Type::Started, build->id}); - } - notificationSenderWakeup.notify_one(); + enqueueNotificationItem({NotificationItem::Type::BuildStarted, build->id}); } } if (!build) build = *dependents.begin(); @@ -107,7 +103,7 @@ State::StepResult State::doBuildStep(nix::ref destStore, Step::ptr step, RemoteResult result; BuildOutput res; - int stepNr = 0; + unsigned int stepNr = 0; bool stepFinished = false; Finally clearStep([&]() { @@ -169,7 +165,8 @@ State::StepResult State::doBuildStep(nix::ref destStore, Step::ptr step, if (result.logFile != "") { { auto logCompressorQueue_(logCompressorQueue.lock()); - logCompressorQueue_->push(result.logFile); + assert(stepNr); + logCompressorQueue_->push({build->id, stepNr, result.logFile}); } logCompressorWakeup.notify_one(); } @@ -269,13 +266,8 @@ State::StepResult State::doBuildStep(nix::ref destStore, Step::ptr step, /* Send notification about the builds that have this step as the top-level. */ - for (auto id : buildIDs) { - { - auto notificationSenderQueue_(notificationSenderQueue.lock()); - notificationSenderQueue_->push(NotificationItem{NotificationItem::Type::Finished, id}); - } - notificationSenderWakeup.notify_one(); - } + for (auto id : buildIDs) + enqueueNotificationItem({NotificationItem::Type::BuildFinished, id}); /* Wake up any dependent steps that have no other dependencies. */ @@ -394,7 +386,7 @@ State::StepResult State::doBuildStep(nix::ref destStore, Step::ptr step, /* Send notification about this build and its dependents. */ { auto notificationSenderQueue_(notificationSenderQueue.lock()); - notificationSenderQueue_->push(NotificationItem{NotificationItem::Type::Finished, build->id, dependentIDs}); + notificationSenderQueue_->push(NotificationItem{NotificationItem::Type::BuildFinished, build->id, dependentIDs}); } notificationSenderWakeup.notify_one(); diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index baacb1cc..7ffd071b 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -213,7 +213,7 @@ void State::clearBusy(Connection & conn, time_t stopTime) } -int State::allocBuildStep(pqxx::work & txn, Build::ptr build) +unsigned int State::allocBuildStep(pqxx::work & txn, Build::ptr build) { /* Acquire an exclusive lock on BuildSteps to ensure that we don't race with other threads creating a step of the same build. */ @@ -224,10 +224,10 @@ int State::allocBuildStep(pqxx::work & txn, Build::ptr build) } -int State::createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step, +unsigned int State::createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step, const std::string & machine, BuildStatus status, const std::string & errorMsg, BuildID propagatedFrom) { - int stepNr = allocBuildStep(txn, build); + unsigned int stepNr = allocBuildStep(txn, build); txn.parameterized ("insert into BuildSteps (build, stepnr, type, drvPath, busy, startTime, system, status, propagatedFrom, errorMsg, stopTime, machine) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)") @@ -254,7 +254,7 @@ int State::createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, void State::finishBuildStep(pqxx::work & txn, time_t startTime, time_t stopTime, unsigned int overhead, - BuildID buildId, int stepNr, const std::string & machine, BuildStatus status, + BuildID buildId, unsigned int stepNr, const std::string & machine, BuildStatus status, const std::string & errorMsg, BuildID propagatedFrom) { assert(startTime); @@ -420,20 +420,21 @@ void State::logCompressor() while (true) { try { - Path logPath; + CompressionItem item; { auto logCompressorQueue_(logCompressorQueue.lock()); while (logCompressorQueue_->empty()) logCompressorQueue_.wait(logCompressorWakeup); - logPath = logCompressorQueue_->front(); + item = logCompressorQueue_->front(); logCompressorQueue_->pop(); } - if (!pathExists(logPath)) continue; + if (!pathExists(item.logPath)) continue; - printMsg(lvlChatty, format("compressing log file ‘%1%’") % logPath); + printMsg(lvlChatty, format("compressing log file ‘%1%’") % item.logPath); - Path tmpPath = logPath + ".bz2.tmp"; + Path dstPath = item.logPath + ".bz2"; + Path tmpPath = dstPath + ".tmp"; AutoCloseFD fd = open(tmpPath.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0644); @@ -442,7 +443,7 @@ void State::logCompressor() Pid pid = startProcess([&]() { if (dup2(fd, STDOUT_FILENO) == -1) throw SysError("cannot dup output pipe to stdout"); - execlp("bzip2", "bzip2", "-c", logPath.c_str(), nullptr); + execlp("bzip2", "bzip2", "-c", item.logPath.c_str(), nullptr); throw SysError("cannot start bzip2"); }); @@ -450,13 +451,18 @@ void State::logCompressor() if (res != 0) throw Error(format("bzip2 returned exit code %1% while compressing ‘%2%’") - % res % logPath); + % res % item.logPath); - if (rename(tmpPath.c_str(), (logPath + ".bz2").c_str()) != 0) + if (rename(tmpPath.c_str(), dstPath.c_str()) != 0) throw SysError(format("renaming ‘%1%’") % tmpPath); - if (unlink(logPath.c_str()) != 0) - throw SysError(format("unlinking ‘%1%’") % logPath); + if (unlink(item.logPath.c_str()) != 0) + throw SysError(format("unlinking ‘%1%’") % item.logPath); + + /* Run plugins. We do this after log compression to ensure + that the log file doesn't change while the plugins may + be accessing it. */ + enqueueNotificationItem({NotificationItem::Type::StepFinished, item.id, {}, item.stepNr, dstPath}); } catch (std::exception & e) { printMsg(lvlError, format("log compressor: %1%") % e.what()); @@ -483,9 +489,22 @@ void State::notificationSender() printMsg(lvlChatty, format("sending notification about build %1%") % item.id); Pid pid = startProcess([&]() { - Strings argv({"hydra-notify", item.type == NotificationItem::Type::Started ? "build-started" : "build-finished", std::to_string(item.id)}); - for (auto id : item.dependentIds) - argv.push_back(std::to_string(id)); + Strings argv; + switch (item.type) { + case NotificationItem::Type::BuildStarted: + argv = {"hydra-notify", "build-started", std::to_string(item.id)}; + for (auto id : item.dependentIds) + argv.push_back(std::to_string(id)); + break; + case NotificationItem::Type::BuildFinished: + argv = {"hydra-notify", "build-finished", std::to_string(item.id)}; + for (auto id : item.dependentIds) + argv.push_back(std::to_string(id)); + break; + case NotificationItem::Type::StepFinished: + argv = {"hydra-notify", "step-finished", std::to_string(item.id), std::to_string(item.stepNr), item.logPath}; + break; + }; execvp("hydra-notify", (char * *) stringsToCharPtrs(argv).data()); // FIXME: remove cast throw SysError("cannot start hydra-notify"); }); diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index ce621984..684af317 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -311,7 +311,13 @@ private: counter nrActiveDbUpdates{0}; /* Log compressor work queue. */ - nix::Sync> logCompressorQueue; + struct CompressionItem + { + BuildID id; + unsigned int stepNr; + nix::Path logPath; + }; + nix::Sync> logCompressorQueue; std::condition_variable logCompressorWakeup; /* Notification sender work queue. FIXME: if hydra-queue-runner is @@ -321,16 +327,28 @@ private: struct NotificationItem { enum class Type : char { - Started, - Finished + BuildStarted, + BuildFinished, + StepFinished, }; Type type; BuildID id; std::vector dependentIds; + unsigned int stepNr; + nix::Path logPath; }; nix::Sync> notificationSenderQueue; std::condition_variable notificationSenderWakeup; + void enqueueNotificationItem(const NotificationItem && item) + { + { + auto notificationSenderQueue_(notificationSenderQueue.lock()); + notificationSenderQueue_->emplace(item); + } + notificationSenderWakeup.notify_one(); + } + /* Specific build to do for --build-one (testing only). */ BuildID buildOne; @@ -397,14 +415,14 @@ private: /* Thread to reload /etc/nix/machines periodically. */ void monitorMachinesFile(); - int allocBuildStep(pqxx::work & txn, Build::ptr build); + unsigned int allocBuildStep(pqxx::work & txn, Build::ptr build); - int createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step, + unsigned int createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step, const std::string & machine, BuildStatus status, const std::string & errorMsg = "", BuildID propagatedFrom = 0); void finishBuildStep(pqxx::work & txn, time_t startTime, time_t stopTime, - unsigned int overhead, BuildID buildId, int stepNr, + unsigned int overhead, BuildID buildId, unsigned int stepNr, const std::string & machine, BuildStatus status, const std::string & errorMsg = "", BuildID propagatedFrom = 0); diff --git a/src/lib/Hydra/Helper/AddBuilds.pm b/src/lib/Hydra/Helper/AddBuilds.pm index 0fa7087f..e756000e 100644 --- a/src/lib/Hydra/Helper/AddBuilds.pm +++ b/src/lib/Hydra/Helper/AddBuilds.pm @@ -16,7 +16,6 @@ use File::Path; use File::Temp; use File::Spec; use File::Slurp; -use Hydra::Helper::PluginHooks; use Hydra::Helper::CatalystUtils; our @ISA = qw(Exporter); diff --git a/src/lib/Hydra/Helper/PluginHooks.pm b/src/lib/Hydra/Helper/PluginHooks.pm deleted file mode 100644 index 8ff59bd2..00000000 --- a/src/lib/Hydra/Helper/PluginHooks.pm +++ /dev/null @@ -1,35 +0,0 @@ -package Hydra::Helper::PluginHooks; - -use strict; -use Exporter; - -our @ISA = qw(Exporter); -our @EXPORT = qw( - notifyBuildStarted - notifyBuildFinished); - -sub notifyBuildStarted { - my ($plugins, $build) = @_; - foreach my $plugin (@{$plugins}) { - eval { - $plugin->buildStarted($build); - }; - if ($@) { - print STDERR "$plugin->buildStarted: $@\n"; - } - } -} - -sub notifyBuildFinished { - my ($plugins, $build, $dependents) = @_; - foreach my $plugin (@{$plugins}) { - eval { - $plugin->buildFinished($build, $dependents); - }; - if ($@) { - print STDERR "$plugin->buildFinished: $@\n"; - } - } -} - -1; diff --git a/src/lib/Hydra/Plugin.pm b/src/lib/Hydra/Plugin.pm index 7783fb9e..a00ab5ba 100644 --- a/src/lib/Hydra/Plugin.pm +++ b/src/lib/Hydra/Plugin.pm @@ -33,6 +33,12 @@ sub buildFinished { my ($self, $build, $dependents) = @_; } +# Called when step $step has finished. The build log is stored in the +# file $logPath (bzip2-compressed). +sub stepFinished { + my ($self, $step, $logPath) = @_; +} + # Called to determine the set of supported input types. The plugin # should add these to the $inputTypes hashref, e.g. $inputTypes{'svn'} # = 'Subversion checkout'. diff --git a/src/script/hydra-notify b/src/script/hydra-notify index e930a0dd..b0984e8e 100755 --- a/src/script/hydra-notify +++ b/src/script/hydra-notify @@ -4,7 +4,6 @@ use strict; use utf8; use Hydra::Plugin; use Hydra::Helper::Nix; -use Hydra::Helper::PluginHooks; use Hydra::Helper::AddBuilds; STDERR->autoflush(1); @@ -16,11 +15,12 @@ my $db = Hydra::Model::DB->new(); my @plugins = Hydra::Plugin->instantiate(db => $db, config => $config); -my $cmd = shift @ARGV or die "Syntax: hydra-notify CMD BUILD-ID [BUILD-IDs...]\n"; +my $cmd = shift @ARGV or die "Syntax: hydra-notify build-started BUILD | build-finished BUILD-ID [BUILD-IDs...] | step-finished BUILD-ID STEP-NR LOG-PATH\n"; my $buildId = shift @ARGV or die; my $build = $db->resultset('Builds')->find($buildId) or die "build $buildId does not exist\n"; + if ($cmd eq "build-finished") { my $project = $build->project; my $jobset = $build->jobset; @@ -33,9 +33,36 @@ if ($cmd eq "build-finished") { or die "build $id does not exist\n"; push @dependents, $dep; } - notifyBuildFinished(\@plugins, $build, [@dependents]); -} elsif ($cmd eq "build-started") { - notifyBuildStarted(\@plugins, $build); + + foreach my $plugin (@plugins) { + eval { $plugin->buildFinished($build, [@dependents]); }; + if ($@) { + print STDERR "$plugin->buildFinished: $@\n"; + } + } +} + +elsif ($cmd eq "build-started") { + foreach my $plugin (@plugins) { + eval { $plugin->buildStarted($build); }; + if ($@) { + print STDERR "$plugin->buildStarted: $@\n"; + } + } +} + +elsif ($cmd eq "step-finished") { + my $stepNr = shift @ARGV or die; + my $step = $build->buildsteps->find({stepnr => $stepNr}) + or die "step $stepNr does not exist\n"; + my $logPath = shift @ARGV or die; + + foreach my $plugin (@plugins) { + eval { $plugin->stepFinished($step, $logPath); }; + if ($@) { + print STDERR "$plugin->stepFinished: $@\n"; + } + } } else {