diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc index e9278705..7f02d081 100644 --- a/src/hydra-queue-runner/build-remote.cc +++ b/src/hydra-queue-runner/build-remote.cc @@ -117,12 +117,13 @@ void buildRemote(std::shared_ptr store, RemoteResult & result) { string base = baseNameOf(drvPath); - Path logFile = logDir + "/" + string(base, 0, 2) + "/" + string(base, 2); + result.logFile = logDir + "/" + string(base, 0, 2) + "/" + string(base, 2); + AutoDelete autoDelete(result.logFile, false); - createDirs(dirOf(logFile)); + createDirs(dirOf(result.logFile)); - AutoCloseFD logFD(open(logFile.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0666)); - if (logFD == -1) throw SysError(format("creating log file ‘%1%’") % logFile); + AutoCloseFD logFD(open(result.logFile.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0666)); + if (logFD == -1) throw SysError(format("creating log file ‘%1%’") % result.logFile); Child child; openConnection(sshName, sshKey, logFD, child); @@ -146,7 +147,8 @@ void buildRemote(std::shared_ptr store, throw Error(format("unsupported ‘nix-store --serve’ protocol version on ‘%1%’") % sshName); } catch (EndOfFile & e) { child.pid.wait(true); - throw Error(format("cannot connect to ‘%1%’: %2%") % sshName % chomp(readFile(logFile))); + string s = chomp(readFile(result.logFile)); + throw Error(format("cannot connect to ‘%1%’: %2%") % sshName % s); } /* Gather the inputs. */ @@ -163,6 +165,8 @@ void buildRemote(std::shared_ptr store, printMsg(lvlDebug, format("sending closure of ‘%1%’ to ‘%2%’") % drvPath % sshName); copyClosureTo(store, from, to, inputs); + autoDelete.cancel(); + /* Do the build. */ printMsg(lvlDebug, format("building ‘%1%’ on ‘%2%’") % drvPath % sshName); writeInt(cmdBuildPaths, to); diff --git a/src/hydra-queue-runner/build-remote.hh b/src/hydra-queue-runner/build-remote.hh index 99e79c8c..d932e8ae 100644 --- a/src/hydra-queue-runner/build-remote.hh +++ b/src/hydra-queue-runner/build-remote.hh @@ -13,6 +13,7 @@ struct RemoteResult } status = rrMiscFailure; std::string errorMsg; time_t startTime = 0, stopTime = 0; + nix::Path logFile; }; void buildRemote(std::shared_ptr store, diff --git a/src/hydra-queue-runner/build-result.cc b/src/hydra-queue-runner/build-result.cc index 78b9b25c..bb431623 100644 --- a/src/hydra-queue-runner/build-result.cc +++ b/src/hydra-queue-runner/build-result.cc @@ -58,8 +58,8 @@ BuildResult getBuildResult(std::shared_ptr store, const Derivation & d } product.defaultPath = words.empty() ? "" : words.front(); - /* Ensure that the path exists and points into the - Nix store. */ + /* Ensure that the path exists and points into the Nix + store. */ if (product.path == "" || product.path[0] != '/') continue; product.path = canonPath(product.path, true); if (!isInStore(product.path) || !pathExists(product.path)) continue; diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index 498297fc..a9de0560 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -2,6 +2,7 @@ #include #include #include +#include #include #include #include @@ -10,6 +11,10 @@ #include +#include +#include +#include + #include "build-result.hh" #include "build-remote.hh" #include "sync.hh" @@ -229,8 +234,6 @@ private: typedef std::list Runnable; Sync runnable; - std::condition_variable_any runnableWakeup; - /* CV for waking up the dispatcher. */ std::condition_variable dispatcherWakeup; std::mutex dispatcherMutex; @@ -252,15 +255,21 @@ private: counter nrQueueWakeups{0}; counter nrDispatcherWakeups{0}; + /* Log compressor work queue. */ + Sync> logCompressorQueue; + std::condition_variable_any logCompressorWakeup; + public: State(); ~State(); - void loadMachines(); - void clearBusy(time_t stopTime); +private: + + void loadMachines(); + int createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step, const std::string & machine, BuildStepStatus status, const std::string & errorMsg = "", BuildID propagatedFrom = 0); @@ -302,6 +311,11 @@ public: bool checkCachedFailure(Step::ptr step, Connection & conn); + /* Thread that asynchronously bzips logs of finished steps. */ + void logCompressor(); + +public: + void dumpStatus(); void run(); @@ -951,7 +965,7 @@ void State::dispatcher() void State::wakeDispatcher() { { std::lock_guard lock(dispatcherMutex); } // barrier - dispatcherWakeup.notify_all(); + dispatcherWakeup.notify_one(); } @@ -1063,6 +1077,7 @@ bool State::doBuildStep(std::shared_ptr store, Step::ptr step, txn.commit(); } + /* Do the build. */ try { /* FIXME: referring builds may have conflicting timeouts. */ buildRemote(store, machine->sshName, machine->sshKey, step->drvPath, step->drv, @@ -1077,6 +1092,15 @@ bool State::doBuildStep(std::shared_ptr store, Step::ptr step, if (!result.stopTime) result.stopTime = time(0); + /* Asynchronously compress the log. */ + if (result.logFile != "") { + { + auto logCompressorQueue_(logCompressorQueue.lock()); + logCompressorQueue_->push(result.logFile); + } + logCompressorWakeup.notify_one(); + } + /* The step had a hopefully temporary failure (e.g. network issue). Retry a number of times. */ if (result.status == RemoteResult::rrMiscFailure) { @@ -1321,6 +1345,57 @@ bool State::checkCachedFailure(Step::ptr step, Connection & conn) } +void State::logCompressor() +{ + while (true) { + try { + + Path logPath; + { + auto logCompressorQueue_(logCompressorQueue.lock()); + while (logCompressorQueue_->empty()) + logCompressorQueue_.wait(logCompressorWakeup); + logPath = logCompressorQueue_->front(); + logCompressorQueue_->pop(); + } + + if (!pathExists(logPath)) continue; + + printMsg(lvlChatty, format("compressing log file ‘%1%’") % logPath); + + Path tmpPath = logPath + ".bz2.tmp"; + + AutoCloseFD fd = open(tmpPath.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0644); + + // FIXME: use libbz2 + + Pid pid = startProcess([&]() { + if (dup2(fd, STDOUT_FILENO) == -1) + throw SysError("cannot dup output pipe to stdout"); + execlp("bzip2", "bzip2", "-c", logPath.c_str(), nullptr); + throw SysError("cannot start ssh"); + }); + + int res = pid.wait(true); + + if (res != 0) + throw Error(format("bzip2 returned exit code %1% while compressing ‘%2%’") + % res % logPath); + + if (rename(tmpPath.c_str(), (logPath + ".bz2").c_str()) != 0) + throw SysError(format("renaming ‘%1%’") % tmpPath); + + if (unlink(logPath.c_str()) != 0) + throw SysError(format("unlinking ‘%1%’") % logPath); + + } catch (std::exception & e) { + printMsg(lvlError, format("log compressor: %1%") % e.what()); + sleep(5); + } + } +} + + void State::dumpStatus() { { @@ -1368,6 +1443,10 @@ void State::run() std::thread(&State::dispatcher, this).detach(); + /* Run a log compressor thread. If needed, we could start more + than one. */ + std::thread(&State::logCompressor, this).detach(); + while (true) { try { auto conn(dbPool.get()); diff --git a/src/lib/Hydra/Helper/Nix.pm b/src/lib/Hydra/Helper/Nix.pm index c54c8f10..23f807dd 100644 --- a/src/lib/Hydra/Helper/Nix.pm +++ b/src/lib/Hydra/Helper/Nix.pm @@ -134,7 +134,7 @@ sub getDrvLogPath { my $bucketed = substr($base, 0, 2) . "/" . substr($base, 2); my $fn = ($ENV{NIX_LOG_DIR} || "/nix/var/log/nix") . "/drvs/"; my $fn2 = Hydra::Model::DB::getHydraPath . "/build-logs/"; - for ($fn2 . $bucketed, $fn . $bucketed . ".bz2", $fn . $bucketed, $fn . $base . ".bz2", $fn . $base) { + for ($fn2 . $bucketed, $fn2 . $bucketed . ".bz2", $fn . $bucketed . ".bz2", $fn . $bucketed, $fn . $base . ".bz2", $fn . $base) { return $_ if -f $_; } return undef;