diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index 37f9f9e9..8a15a96e 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -261,6 +261,14 @@ private: Sync> logCompressorQueue; std::condition_variable_any logCompressorWakeup; + /* Notification sender work queue. FIXME: if hydra-queue-runner is + killed before it has finished sending notifications about a + build, then the notifications may be lost. It would be better + to mark builds with pending notification in the database. */ + typedef std::pair> NotificationItem; + Sync> notificationSenderQueue; + std::condition_variable_any notificationSenderWakeup; + public: State(); @@ -314,6 +322,10 @@ private: /* Thread that asynchronously bzips logs of finished steps. */ void logCompressor(); + /* Thread that asynchronously invokes hydra-notify to send build + notifications. */ + void notificationSender(); + /* Acquire the global queue runner lock, or null if somebody else has it. */ std::shared_ptr acquireGlobalLock(); @@ -1186,6 +1198,13 @@ bool State::doBuildStep(std::shared_ptr store, Step::ptr step, } } + /* Send notification about this build. */ + { + auto notificationSenderQueue_(notificationSenderQueue.lock()); + notificationSenderQueue_->push(NotificationItem(build->id, std::vector())); + } + notificationSenderWakeup.notify_one(); + /* Wake up any dependent steps that have no other dependencies. */ { @@ -1213,6 +1232,8 @@ bool State::doBuildStep(std::shared_ptr store, Step::ptr step, /* Register failure in the database for all Build objects that directly or indirectly depend on this step. */ + std::vector dependentIDs; + while (true) { /* Get the builds and steps that depend on this step. */ @@ -1273,6 +1294,7 @@ bool State::doBuildStep(std::shared_ptr store, Step::ptr step, for (auto & build2 : indirect) { if (build2->finishedInDB) continue; printMsg(lvlError, format("marking build %1% as failed") % build2->id); + dependentIDs.push_back(build2->id); txn.parameterized ("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $4, isCachedBuild = $5 where id = $1 and finished = 0") (build2->id) @@ -1301,6 +1323,13 @@ bool State::doBuildStep(std::shared_ptr store, Step::ptr step, } } + /* Send notification about this build and its dependents. */ + { + auto notificationSenderQueue_(notificationSenderQueue.lock()); + notificationSenderQueue_->push(NotificationItem(build->id, dependentIDs)); + } + notificationSenderWakeup.notify_one(); + } // FIXME: keep stats about aborted steps? @@ -1391,7 +1420,7 @@ void State::logCompressor() if (dup2(fd, STDOUT_FILENO) == -1) throw SysError("cannot dup output pipe to stdout"); execlp("bzip2", "bzip2", "-c", logPath.c_str(), nullptr); - throw SysError("cannot start ssh"); + throw SysError("cannot start bzip2"); }); int res = pid.wait(true); @@ -1414,6 +1443,44 @@ void State::logCompressor() } +void State::notificationSender() +{ + while (true) { + try { + + NotificationItem item; + { + auto notificationSenderQueue_(notificationSenderQueue.lock()); + while (notificationSenderQueue_->empty()) + notificationSenderQueue_.wait(notificationSenderWakeup); + item = notificationSenderQueue_->front(); + notificationSenderQueue_->pop(); + } + + printMsg(lvlChatty, format("sending notification about build %1%") % item.first); + + Pid pid = startProcess([&]() { + Strings argv({"hydra-notify", "build", int2String(item.first)}); + for (auto id : item.second) + argv.push_back(int2String(id)); + execvp("hydra-notify", (char * *) stringsToCharPtrs(argv).data()); // FIXME: remove cast + throw SysError("cannot start hydra-notify"); + }); + + int res = pid.wait(true); + + if (res != 0) + throw Error(format("hydra-build returned exit code %1% notifying about build %2%") + % res % item.first); + + } catch (std::exception & e) { + printMsg(lvlError, format("notification sender: %1%") % e.what()); + sleep(5); + } + } +} + + std::shared_ptr State::acquireGlobalLock() { Path lockPath = hydraData + "/queue-runner"; @@ -1580,7 +1647,7 @@ void State::run() loadMachines(); - auto queueMonitorThread = std::thread(&State::queueMonitor, this); + std::thread(&State::queueMonitor, this).detach(); std::thread(&State::dispatcher, this).detach(); @@ -1588,6 +1655,11 @@ void State::run() than one. */ std::thread(&State::logCompressor, this).detach(); + /* Idem for notification sending. */ + std::thread(&State::notificationSender, this).detach(); + + /* Monitor the database for status dump requests (e.g. from + ‘hydra-queue-runner --status’). */ while (true) { try { auto conn(dbPool.get()); @@ -1601,9 +1673,6 @@ void State::run() sleep(10); // probably a DB problem, so don't retry right away } } - - // Never reached. - queueMonitorThread.join(); } diff --git a/src/script/Makefile.am b/src/script/Makefile.am index cfdeea8c..ce612f37 100644 --- a/src/script/Makefile.am +++ b/src/script/Makefile.am @@ -9,6 +9,7 @@ distributable_scripts = \ hydra-update-gc-roots \ hydra-s3-backup-collect-garbage \ hydra-create-user \ + hydra-notify \ nix-prefetch-git \ nix-prefetch-bzr \ nix-prefetch-hg diff --git a/src/script/hydra-evaluator b/src/script/hydra-evaluator index bcdb948e..7dce10c7 100755 --- a/src/script/hydra-evaluator +++ b/src/script/hydra-evaluator @@ -1,4 +1,4 @@ -#! /var/run/current-system/sw/bin/perl +#! /run/current-system/sw/bin/perl use strict; use utf8; diff --git a/src/script/hydra-notify b/src/script/hydra-notify new file mode 100755 index 00000000..cf8599d3 --- /dev/null +++ b/src/script/hydra-notify @@ -0,0 +1,35 @@ +#! /run/current-system/sw/bin/perl + +use strict; +use utf8; +use Hydra::Plugin; +use Hydra::Helper::Nix; +use Hydra::Helper::PluginHooks; + +STDERR->autoflush(1); +binmode STDERR, ":encoding(utf8)"; + +my $config = getHydraConfig(); + +my $db = Hydra::Model::DB->new(); + +my @plugins = Hydra::Plugin->instantiate(db => $db, config => $config); + +my $cmd = shift @ARGV or die "Syntax: hydra-notify build BUILD-ID [BUILD-IDs...]\n"; + +if ($cmd eq "build") { + my $buildId = shift @ARGV or die; + my $build = $db->resultset('Builds')->find($buildId) + or die "build $buildId does not exist\n"; + my @dependents; + foreach my $id (@ARGV) { + my $dep = $db->resultset('Builds')->find($id) + or die "build $id does not exist\n"; + push @dependents, $dep; + } + notifyBuildFinished(\@plugins, $build, [@dependents]); +} + +else { + die "unknown action ‘$cmd’"; +}