Asynchronously compress build logs

This commit is contained in:
Eelco Dolstra 2015-06-19 14:51:59 +02:00
parent 8e408048e2
commit 133d298e26
5 changed files with 97 additions and 13 deletions

View file

@ -117,12 +117,13 @@ void buildRemote(std::shared_ptr<StoreAPI> store,
RemoteResult & result)
{
string base = baseNameOf(drvPath);
Path logFile = logDir + "/" + string(base, 0, 2) + "/" + string(base, 2);
result.logFile = logDir + "/" + string(base, 0, 2) + "/" + string(base, 2);
AutoDelete autoDelete(result.logFile, false);
createDirs(dirOf(logFile));
createDirs(dirOf(result.logFile));
AutoCloseFD logFD(open(logFile.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0666));
if (logFD == -1) throw SysError(format("creating log file %1%") % logFile);
AutoCloseFD logFD(open(result.logFile.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0666));
if (logFD == -1) throw SysError(format("creating log file %1%") % result.logFile);
Child child;
openConnection(sshName, sshKey, logFD, child);
@ -146,7 +147,8 @@ void buildRemote(std::shared_ptr<StoreAPI> store,
throw Error(format("unsupported nix-store --serve protocol version on %1%") % sshName);
} catch (EndOfFile & e) {
child.pid.wait(true);
throw Error(format("cannot connect to %1%: %2%") % sshName % chomp(readFile(logFile)));
string s = chomp(readFile(result.logFile));
throw Error(format("cannot connect to %1%: %2%") % sshName % s);
}
/* Gather the inputs. */
@ -163,6 +165,8 @@ void buildRemote(std::shared_ptr<StoreAPI> store,
printMsg(lvlDebug, format("sending closure of %1% to %2%") % drvPath % sshName);
copyClosureTo(store, from, to, inputs);
autoDelete.cancel();
/* Do the build. */
printMsg(lvlDebug, format("building %1% on %2%") % drvPath % sshName);
writeInt(cmdBuildPaths, to);

View file

@ -13,6 +13,7 @@ struct RemoteResult
} status = rrMiscFailure;
std::string errorMsg;
time_t startTime = 0, stopTime = 0;
nix::Path logFile;
};
void buildRemote(std::shared_ptr<nix::StoreAPI> store,

View file

@ -58,8 +58,8 @@ BuildResult getBuildResult(std::shared_ptr<StoreAPI> store, const Derivation & d
}
product.defaultPath = words.empty() ? "" : words.front();
/* Ensure that the path exists and points into the
Nix store. */
/* Ensure that the path exists and points into the Nix
store. */
if (product.path == "" || product.path[0] != '/') continue;
product.path = canonPath(product.path, true);
if (!isInStore(product.path) || !pathExists(product.path)) continue;

View file

@ -2,6 +2,7 @@
#include <condition_variable>
#include <iostream>
#include <map>
#include <queue>
#include <memory>
#include <thread>
#include <cmath>
@ -10,6 +11,10 @@
#include <pqxx/pqxx>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "build-result.hh"
#include "build-remote.hh"
#include "sync.hh"
@ -229,8 +234,6 @@ private:
typedef std::list<Step::wptr> Runnable;
Sync<Runnable> runnable;
std::condition_variable_any runnableWakeup;
/* CV for waking up the dispatcher. */
std::condition_variable dispatcherWakeup;
std::mutex dispatcherMutex;
@ -252,15 +255,21 @@ private:
counter nrQueueWakeups{0};
counter nrDispatcherWakeups{0};
/* Log compressor work queue. */
Sync<std::queue<Path>> logCompressorQueue;
std::condition_variable_any logCompressorWakeup;
public:
State();
~State();
void loadMachines();
void clearBusy(time_t stopTime);
private:
void loadMachines();
int createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step,
const std::string & machine, BuildStepStatus status, const std::string & errorMsg = "",
BuildID propagatedFrom = 0);
@ -302,6 +311,11 @@ public:
bool checkCachedFailure(Step::ptr step, Connection & conn);
/* Thread that asynchronously bzips logs of finished steps. */
void logCompressor();
public:
void dumpStatus();
void run();
@ -951,7 +965,7 @@ void State::dispatcher()
void State::wakeDispatcher()
{
{ std::lock_guard<std::mutex> lock(dispatcherMutex); } // barrier
dispatcherWakeup.notify_all();
dispatcherWakeup.notify_one();
}
@ -1063,6 +1077,7 @@ bool State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,
txn.commit();
}
/* Do the build. */
try {
/* FIXME: referring builds may have conflicting timeouts. */
buildRemote(store, machine->sshName, machine->sshKey, step->drvPath, step->drv,
@ -1077,6 +1092,15 @@ bool State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,
if (!result.stopTime) result.stopTime = time(0);
/* Asynchronously compress the log. */
if (result.logFile != "") {
{
auto logCompressorQueue_(logCompressorQueue.lock());
logCompressorQueue_->push(result.logFile);
}
logCompressorWakeup.notify_one();
}
/* The step had a hopefully temporary failure (e.g. network
issue). Retry a number of times. */
if (result.status == RemoteResult::rrMiscFailure) {
@ -1321,6 +1345,57 @@ bool State::checkCachedFailure(Step::ptr step, Connection & conn)
}
void State::logCompressor()
{
while (true) {
try {
Path logPath;
{
auto logCompressorQueue_(logCompressorQueue.lock());
while (logCompressorQueue_->empty())
logCompressorQueue_.wait(logCompressorWakeup);
logPath = logCompressorQueue_->front();
logCompressorQueue_->pop();
}
if (!pathExists(logPath)) continue;
printMsg(lvlChatty, format("compressing log file %1%") % logPath);
Path tmpPath = logPath + ".bz2.tmp";
AutoCloseFD fd = open(tmpPath.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0644);
// FIXME: use libbz2
Pid pid = startProcess([&]() {
if (dup2(fd, STDOUT_FILENO) == -1)
throw SysError("cannot dup output pipe to stdout");
execlp("bzip2", "bzip2", "-c", logPath.c_str(), nullptr);
throw SysError("cannot start ssh");
});
int res = pid.wait(true);
if (res != 0)
throw Error(format("bzip2 returned exit code %1% while compressing %2%")
% res % logPath);
if (rename(tmpPath.c_str(), (logPath + ".bz2").c_str()) != 0)
throw SysError(format("renaming %1%") % tmpPath);
if (unlink(logPath.c_str()) != 0)
throw SysError(format("unlinking %1%") % logPath);
} catch (std::exception & e) {
printMsg(lvlError, format("log compressor: %1%") % e.what());
sleep(5);
}
}
}
void State::dumpStatus()
{
{
@ -1368,6 +1443,10 @@ void State::run()
std::thread(&State::dispatcher, this).detach();
/* Run a log compressor thread. If needed, we could start more
than one. */
std::thread(&State::logCompressor, this).detach();
while (true) {
try {
auto conn(dbPool.get());

View file

@ -134,7 +134,7 @@ sub getDrvLogPath {
my $bucketed = substr($base, 0, 2) . "/" . substr($base, 2);
my $fn = ($ENV{NIX_LOG_DIR} || "/nix/var/log/nix") . "/drvs/";
my $fn2 = Hydra::Model::DB::getHydraPath . "/build-logs/";
for ($fn2 . $bucketed, $fn . $bucketed . ".bz2", $fn . $bucketed, $fn . $base . ".bz2", $fn . $base) {
for ($fn2 . $bucketed, $fn2 . $bucketed . ".bz2", $fn . $bucketed . ".bz2", $fn . $bucketed, $fn . $base . ".bz2", $fn . $base) {
return $_ if -f $_;
}
return undef;