Move log compression to a plugin

This commit is contained in:
Eelco Dolstra 2017-03-13 17:18:22 +01:00
parent 285754aff6
commit 7e6486e694
No known key found for this signature in database
GPG key ID: 8170B4726D7198DE
4 changed files with 27 additions and 85 deletions

View file

@ -146,6 +146,13 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
auto orphanedSteps_(orphanedSteps.lock()); auto orphanedSteps_(orphanedSteps.lock());
orphanedSteps_->emplace(buildId, stepNr); orphanedSteps_->emplace(buildId, stepNr);
} }
if (stepNr) {
/* Asynchronously run plugins. FIXME: if we're killed,
plugin actions might not be run. Need to ensure
at-least-once semantics. */
enqueueNotificationItem({NotificationItem::Type::StepFinished, buildId, {}, stepNr, result.logFile});
}
}); });
time_t stepStartTime = result.startTime = time(0); time_t stepStartTime = result.startTime = time(0);
@ -205,16 +212,6 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
} }
} }
/* Asynchronously compress the log. */
if (result.logFile != "") {
{
auto logCompressorQueue_(logCompressorQueue.lock());
assert(stepNr);
logCompressorQueue_->push({buildId, stepNr, result.logFile});
}
logCompressorWakeup.notify_one();
}
/* The step had a hopefully temporary failure (e.g. network /* The step had a hopefully temporary failure (e.g. network
issue). Retry a number of times. */ issue). Retry a number of times. */
if (result.canRetry) { if (result.canRetry) {
@ -446,7 +443,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
machine->state->totalStepTime += stepStopTime - stepStartTime; machine->state->totalStepTime += stepStopTime - stepStartTime;
machine->state->totalStepBuildTime += result.stopTime - result.startTime; machine->state->totalStepBuildTime += result.stopTime - result.startTime;
if (quit) exit(0); // testing hack if (quit) exit(0); // testing hack; FIXME: this won't run plugins
return sDone; return sDone;
} }

View file

@ -460,63 +460,6 @@ bool State::checkCachedFailure(Step::ptr step, Connection & conn)
} }
void State::logCompressor()
{
while (true) {
try {
CompressionItem item;
{
auto logCompressorQueue_(logCompressorQueue.lock());
while (logCompressorQueue_->empty())
logCompressorQueue_.wait(logCompressorWakeup);
item = logCompressorQueue_->front();
logCompressorQueue_->pop();
}
if (!pathExists(item.logPath)) continue;
printMsg(lvlChatty, format("compressing log file %1%") % item.logPath);
Path dstPath = item.logPath + ".bz2";
Path tmpPath = dstPath + ".tmp";
AutoCloseFD fd = open(tmpPath.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0644);
// FIXME: use libbz2
Pid pid = startProcess([&]() {
if (dup2(fd.get(), STDOUT_FILENO) == -1)
throw SysError("cannot dup output pipe to stdout");
execlp("bzip2", "bzip2", "-c", item.logPath.c_str(), nullptr);
throw SysError("cannot start bzip2");
});
int res = pid.wait();
if (res != 0)
throw Error(format("bzip2 returned exit code %1% while compressing %2%")
% res % item.logPath);
if (rename(tmpPath.c_str(), dstPath.c_str()) != 0)
throw SysError(format("renaming %1%") % tmpPath);
if (unlink(item.logPath.c_str()) != 0)
throw SysError(format("unlinking %1%") % item.logPath);
/* Run plugins. We do this after log compression to ensure
that the log file doesn't change while the plugins may
be accessing it. */
enqueueNotificationItem({NotificationItem::Type::StepFinished, item.id, {}, item.stepNr, dstPath});
} catch (std::exception & e) {
printMsg(lvlError, format("log compressor: %1%") % e.what());
sleep(5);
}
}
}
void State::notificationSender() void State::notificationSender()
{ {
while (true) { while (true) {
@ -860,10 +803,6 @@ void State::run(BuildID buildOne)
std::thread(&State::dispatcher, this).detach(); std::thread(&State::dispatcher, this).detach();
/* Run a log compressor thread. If needed, we could start more
than one. */
std::thread(&State::logCompressor, this).detach();
/* Idem for notification sending. */ /* Idem for notification sending. */
std::thread(&State::notificationSender, this).detach(); std::thread(&State::notificationSender, this).detach();

View file

@ -321,16 +321,6 @@ private:
counter bytesReceived{0}; counter bytesReceived{0};
counter nrActiveDbUpdates{0}; counter nrActiveDbUpdates{0};
/* Log compressor work queue. */
struct CompressionItem
{
BuildID id;
unsigned int stepNr;
nix::Path logPath;
};
nix::Sync<std::queue<CompressionItem>> logCompressorQueue;
std::condition_variable logCompressorWakeup;
/* Notification sender work queue. FIXME: if hydra-queue-runner is /* Notification sender work queue. FIXME: if hydra-queue-runner is
killed before it has finished sending notifications about a killed before it has finished sending notifications about a
build, then the notifications may be lost. It would be better build, then the notifications may be lost. It would be better
@ -508,9 +498,6 @@ private:
bool checkCachedFailure(Step::ptr step, Connection & conn); bool checkCachedFailure(Step::ptr step, Connection & conn);
/* Thread that asynchronously bzips logs of finished steps. */
void logCompressor();
/* Thread that asynchronously invokes hydra-notify to send build /* Thread that asynchronously invokes hydra-notify to send build
notifications. */ notifications. */
void notificationSender(); void notificationSender();

View file

@ -0,0 +1,19 @@
package Hydra::Plugin::CompressLog;
use strict;
use utf8;
use parent 'Hydra::Plugin';
use Hydra::Helper::CatalystUtils;
sub stepFinished {
my ($self, $step, $logPath) = @_;
my $doCompress = $self->{config}->{'compress_build_logs'} // "1";
if ($doCompress eq "1" && -e $logPath) {
print STDERR "compressing $logPath...\n";
system("bzip2", "--force", $logPath);
}
}
1;