diff --git a/src/hydra-queue-runner/builder.cc b/src/hydra-queue-runner/builder.cc index 667a5192..ef690645 100644 --- a/src/hydra-queue-runner/builder.cc +++ b/src/hydra-queue-runner/builder.cc @@ -146,6 +146,13 @@ State::StepResult State::doBuildStep(nix::ref destStore, auto orphanedSteps_(orphanedSteps.lock()); orphanedSteps_->emplace(buildId, stepNr); } + + if (stepNr) { + /* Asynchronously run plugins. FIXME: if we're killed, + plugin actions might not be run. Need to ensure + at-least-once semantics. */ + enqueueNotificationItem({NotificationItem::Type::StepFinished, buildId, {}, stepNr, result.logFile}); + } }); time_t stepStartTime = result.startTime = time(0); @@ -205,16 +212,6 @@ State::StepResult State::doBuildStep(nix::ref destStore, } } - /* Asynchronously compress the log. */ - if (result.logFile != "") { - { - auto logCompressorQueue_(logCompressorQueue.lock()); - assert(stepNr); - logCompressorQueue_->push({buildId, stepNr, result.logFile}); - } - logCompressorWakeup.notify_one(); - } - /* The step had a hopefully temporary failure (e.g. network issue). Retry a number of times. */ if (result.canRetry) { @@ -446,7 +443,7 @@ State::StepResult State::doBuildStep(nix::ref destStore, machine->state->totalStepTime += stepStopTime - stepStartTime; machine->state->totalStepBuildTime += result.stopTime - result.startTime; - if (quit) exit(0); // testing hack + if (quit) exit(0); // testing hack; FIXME: this won't run plugins return sDone; } diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index f66b0f9d..402960b3 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -460,63 +460,6 @@ bool State::checkCachedFailure(Step::ptr step, Connection & conn) } -void State::logCompressor() -{ - while (true) { - try { - - CompressionItem item; - { - auto logCompressorQueue_(logCompressorQueue.lock()); - while (logCompressorQueue_->empty()) - logCompressorQueue_.wait(logCompressorWakeup); - item = logCompressorQueue_->front(); - logCompressorQueue_->pop(); - } - - if (!pathExists(item.logPath)) continue; - - printMsg(lvlChatty, format("compressing log file ‘%1%’") % item.logPath); - - Path dstPath = item.logPath + ".bz2"; - Path tmpPath = dstPath + ".tmp"; - - AutoCloseFD fd = open(tmpPath.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0644); - - // FIXME: use libbz2 - - Pid pid = startProcess([&]() { - if (dup2(fd.get(), STDOUT_FILENO) == -1) - throw SysError("cannot dup output pipe to stdout"); - execlp("bzip2", "bzip2", "-c", item.logPath.c_str(), nullptr); - throw SysError("cannot start bzip2"); - }); - - int res = pid.wait(); - - if (res != 0) - throw Error(format("bzip2 returned exit code %1% while compressing ‘%2%’") - % res % item.logPath); - - if (rename(tmpPath.c_str(), dstPath.c_str()) != 0) - throw SysError(format("renaming ‘%1%’") % tmpPath); - - if (unlink(item.logPath.c_str()) != 0) - throw SysError(format("unlinking ‘%1%’") % item.logPath); - - /* Run plugins. We do this after log compression to ensure - that the log file doesn't change while the plugins may - be accessing it. */ - enqueueNotificationItem({NotificationItem::Type::StepFinished, item.id, {}, item.stepNr, dstPath}); - - } catch (std::exception & e) { - printMsg(lvlError, format("log compressor: %1%") % e.what()); - sleep(5); - } - } -} - - void State::notificationSender() { while (true) { @@ -860,10 +803,6 @@ void State::run(BuildID buildOne) std::thread(&State::dispatcher, this).detach(); - /* Run a log compressor thread. If needed, we could start more - than one. */ - std::thread(&State::logCompressor, this).detach(); - /* Idem for notification sending. */ std::thread(&State::notificationSender, this).detach(); diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index d4cbbb5c..285dac99 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -321,16 +321,6 @@ private: counter bytesReceived{0}; counter nrActiveDbUpdates{0}; - /* Log compressor work queue. */ - struct CompressionItem - { - BuildID id; - unsigned int stepNr; - nix::Path logPath; - }; - nix::Sync> logCompressorQueue; - std::condition_variable logCompressorWakeup; - /* Notification sender work queue. FIXME: if hydra-queue-runner is killed before it has finished sending notifications about a build, then the notifications may be lost. It would be better @@ -508,9 +498,6 @@ private: bool checkCachedFailure(Step::ptr step, Connection & conn); - /* Thread that asynchronously bzips logs of finished steps. */ - void logCompressor(); - /* Thread that asynchronously invokes hydra-notify to send build notifications. */ void notificationSender(); diff --git a/src/lib/Hydra/Plugin/CompressLog.pm b/src/lib/Hydra/Plugin/CompressLog.pm new file mode 100644 index 00000000..bf8d4327 --- /dev/null +++ b/src/lib/Hydra/Plugin/CompressLog.pm @@ -0,0 +1,19 @@ +package Hydra::Plugin::CompressLog; + +use strict; +use utf8; +use parent 'Hydra::Plugin'; +use Hydra::Helper::CatalystUtils; + +sub stepFinished { + my ($self, $step, $logPath) = @_; + + my $doCompress = $self->{config}->{'compress_build_logs'} // "1"; + + if ($doCompress eq "1" && -e $logPath) { + print STDERR "compressing ‘$logPath’...\n"; + system("bzip2", "--force", $logPath); + } +} + +1;