diff --git a/configure.ac b/configure.ac index af0dd194..971b502d 100644 --- a/configure.ac +++ b/configure.ac @@ -73,6 +73,7 @@ AC_CONFIG_FILES([ doc/manual/Makefile src/Makefile src/hydra-eval-jobs/Makefile + src/hydra-queue-runner/Makefile src/sql/Makefile src/xsl/Makefile src/ttf/Makefile diff --git a/doc/manual/Makefile.am b/doc/manual/Makefile.am index 67fa3da6..7611153d 100644 --- a/doc/manual/Makefile.am +++ b/doc/manual/Makefile.am @@ -16,7 +16,7 @@ dblatex_opts = \ # Include the manual in the tarball. dist_html_DATA = manual.html style.css -dist_pdf_DATA = manual.pdf +#dist_pdf_DATA = manual.pdf # Embed Docbook's callout images in the distribution. EXTRA_DIST += images diff --git a/doc/manual/installation.xml b/doc/manual/installation.xml index fb7ef5ce..64af9374 100644 --- a/doc/manual/installation.xml +++ b/doc/manual/installation.xml @@ -100,13 +100,8 @@ nix-env -i hydra - Command completion should reveal a number of command-line tools from Hydra: - - -hydra-build hydra-init hydra-update-gc-roots -hydra-eval-jobs hydra-queue-runner -hydra-evaluator hydra-server - + Command completion should reveal a number of command-line tools + from Hydra, such as hydra-queue-runner. diff --git a/hydra-module.nix b/hydra-module.nix index 1a9c5b2e..8e728ee6 100644 --- a/hydra-module.nix +++ b/hydra-module.nix @@ -3,6 +3,7 @@ with lib; let + cfg = config.services.hydra; baseDir = "/var/lib/hydra"; @@ -11,19 +12,27 @@ let hydraEnv = { HYDRA_DBI = cfg.dbi; - HYDRA_CONFIG = "${baseDir}/data/hydra.conf"; - HYDRA_DATA = "${baseDir}/data"; + HYDRA_CONFIG = "${baseDir}/hydra.conf"; + HYDRA_DATA = "${baseDir}"; }; env = { NIX_REMOTE = "daemon"; - OPENSSL_X509_CERT_FILE = "/etc/ssl/certs/ca-bundle.crt"; - GIT_SSL_CAINFO = "/etc/ssl/certs/ca-bundle.crt"; + SSL_CERT_FILE = "/etc/ssl/certs/ca-certificates.crt"; + OPENSSL_X509_CERT_FILE = "/etc/ssl/certs/ca-certificates.crt"; # FIXME: remove on NixOS >= 15.07 + PGPASSFILE = "${baseDir}/pgpass"; } // hydraEnv // cfg.extraEnv; serverEnv = env // { HYDRA_TRACKER = cfg.tracker; + COLUMNS = "80"; + PGPASSFILE = "${baseDir}/pgpass-www"; # grrr } // (optionalAttrs cfg.debugServer { DBIC_TRACE = 1; }); + + localDB = "dbi:Pg:dbname=hydra;user=hydra;"; + + haveLocalDB = cfg.dbi == localDB; + in { @@ -41,8 +50,8 @@ in }; dbi = mkOption { - type = types.string; - default = "dbi:Pg:dbname=hydra;user=hydra;"; + type = types.str; + default = localDB; example = "dbi:Pg:dbname=hydra;host=postgres.example.org;user=foo;"; description = '' The DBI string for Hydra database connection. @@ -82,7 +91,7 @@ in minimumDiskFree = mkOption { type = types.int; - default = 5; + default = 0; description = '' Threshold of minimum disk space (GiB) to determine if queue runner should run or not. ''; @@ -90,7 +99,7 @@ in minimumDiskFreeEvaluator = mkOption { type = types.int; - default = 2; + default = 0; description = '' Threshold of minimum disk space (GiB) to determine if evaluator should run or not. ''; @@ -115,25 +124,25 @@ in type = types.nullOr types.path; default = null; description = '' - File name of an alternate logo to be displayed on the web pages. + Path to a file containing the logo of your Hydra instance. ''; }; debugServer = mkOption { type = types.bool; default = false; - description = "Whether to run the server in debug mode"; + description = "Whether to run the server in debug mode."; }; extraConfig = mkOption { type = types.lines; - description = "Extra lines for the hydra config"; + description = "Extra lines for the Hydra configuration."; }; extraEnv = mkOption { type = types.attrsOf types.str; default = {}; - description = "Extra environment variables for Hydra"; + description = "Extra environment variables for Hydra."; }; }; @@ -144,6 +153,33 @@ in config = mkIf cfg.enable { + users.extraGroups.hydra = { }; + + users.extraUsers.hydra = + { description = "Hydra"; + group = "hydra"; + createHome = true; + home = baseDir; + useDefaultShell = true; + }; + + users.extraUsers.hydra-queue-runner = + { description = "Hydra queue runner"; + group = "hydra"; + useDefaultShell = true; + home = "${baseDir}/queue-runner"; # really only to keep SSH happy + }; + + users.extraUsers.hydra-www = + { description = "Hydra web server"; + group = "hydra"; + useDefaultShell = true; + }; + + nix.trustedUsers = [ "hydra-queue-runner" ]; + + services.hydra.package = mkDefault ((import ./release.nix {}).build.x86_64-linux); + services.hydra.extraConfig = '' using_frontend_proxy 1 @@ -159,13 +195,6 @@ in environment.variables = hydraEnv; - users.extraUsers.hydra = - { description = "Hydra"; - home = baseDir; - createHome = true; - useDefaultShell = true; - }; - nix.extraOptions = '' gc-keep-outputs = true gc-keep-derivations = true @@ -173,25 +202,28 @@ in # The default (`true') slows Nix down a lot since the build farm # has so many GC roots. gc-check-reachability = false - - # Hydra needs caching of build failures. - build-cache-failure = true - - build-poll-interval = 10 - - # Online log compression makes it impossible to get the tail of - # builds that are in progress. - build-compress-log = false ''; - systemd.services."hydra-init" = + systemd.services.hydra-init = { wantedBy = [ "multi-user.target" ]; + requires = optional haveLocalDB "postgresql.service"; + after = optional haveLocalDB "postgresql.service"; environment = env; preStart = '' - mkdir -m 0700 -p ${baseDir}/data - chown hydra ${baseDir}/data - ln -sf ${hydraConf} ${baseDir}/data/hydra.conf - ${optionalString (cfg.dbi == "dbi:Pg:dbname=hydra;user=hydra;") '' + mkdir -p ${baseDir} + chown hydra.hydra ${baseDir} + chmod 0750 ${baseDir} + + ln -sf ${hydraConf} ${baseDir}/hydra.conf + + mkdir -m 0700 -p ${baseDir}/www + chown hydra-www.hydra ${baseDir}/www + + mkdir -m 0700 -p ${baseDir}/queue-runner + mkdir -m 0750 -p ${baseDir}/build-logs + chown hydra-queue-runner.hydra ${baseDir}/queue-runner ${baseDir}/build-logs + + ${optionalString haveLocalDB '' if ! [ -e ${baseDir}/.db-created ]; then ${config.services.postgresql.package}/bin/createuser hydra ${config.services.postgresql.package}/bin/createdb -O hydra hydra @@ -204,41 +236,43 @@ in serviceConfig.User = "hydra"; serviceConfig.Type = "oneshot"; serviceConfig.RemainAfterExit = true; - } // (optionalAttrs (cfg.dbi == "dbi:Pg:dbname=hydra;user=hydra;") { - requires = [ "postgresql.service" ]; - after = [ "postgresql.service" ]; - }); + }; - systemd.services."hydra-server" = + systemd.services.hydra-server = { wantedBy = [ "multi-user.target" ]; requires = [ "hydra-init.service" ]; after = [ "hydra-init.service" ]; - environment = serverEnv // { COLUMNS = "80"; }; + environment = serverEnv; serviceConfig = { ExecStart = "@${cfg.package}/bin/hydra-server hydra-server -f -h '${cfg.listenHost}' " + "-p ${toString cfg.port} --max_spare_servers 5 --max_servers 25 " + "--max_requests 100 ${optionalString cfg.debugServer "-d"}"; - User = "hydra"; + User = "hydra-www"; + PermissionsStartOnly = true; Restart = "always"; }; }; - systemd.services."hydra-queue-runner" = + systemd.services.hydra-queue-runner = { wantedBy = [ "multi-user.target" ]; requires = [ "hydra-init.service" ]; after = [ "hydra-init.service" "network.target" ]; path = [ pkgs.nettools ]; - environment = env; + environment = env // { + PGPASSFILE = "${baseDir}/pgpass-queue-runner"; # grrr + IN_SYSTEMD = "1"; # to get log severity levels + }; serviceConfig = { ExecStartPre = "${cfg.package}/bin/hydra-queue-runner --unlock"; - ExecStart = "@${cfg.package}/bin/hydra-queue-runner hydra-queue-runner"; - User = "hydra"; + ExecStart = "@${cfg.package}/bin/hydra-queue-runner hydra-queue-runner -v"; + ExecStopPost = "${cfg.package}/bin/hydra-queue-runner --unlock"; + User = "hydra-queue-runner"; Restart = "always"; }; }; - systemd.services."hydra-evaluator" = + systemd.services.hydra-evaluator = { wantedBy = [ "multi-user.target" ]; requires = [ "hydra-init.service" ]; after = [ "hydra-init.service" "network.target" ]; @@ -251,7 +285,7 @@ in }; }; - systemd.services."hydra-update-gc-roots" = + systemd.services.hydra-update-gc-roots = { requires = [ "hydra-init.service" ]; after = [ "hydra-init.service" ]; environment = env; @@ -259,25 +293,51 @@ in { ExecStart = "@${cfg.package}/bin/hydra-update-gc-roots hydra-update-gc-roots"; User = "hydra"; }; - startAt = "02:15"; + startAt = "2,14:15"; }; - services.cron.systemCronJobs = - let - # If there is less than ... GiB of free disk space, stop the queue - # to prevent builds from failing or aborting. - checkSpace = pkgs.writeScript "hydra-check-space" + systemd.services.hydra-send-stats = + { wantedBy = [ "multi-user.target" ]; + after = [ "hydra-init.service" ]; + environment = env; + serviceConfig = + { ExecStart = "@${cfg.package}/bin/hydra-send-stats hydra-send-stats"; + User = "hydra"; + }; + }; + + # If there is less than a certain amount of free disk space, stop + # the queue/evaluator to prevent builds from failing or aborting. + systemd.services.hydra-check-space = + { script = '' - #! ${pkgs.stdenv.shell} if [ $(($(stat -f -c '%a' /nix/store) * $(stat -f -c '%S' /nix/store))) -lt $((${toString cfg.minimumDiskFree} * 1024**3)) ]; then + echo "stopping Hydra queue runner due to lack of free space..." systemctl stop hydra-queue-runner fi if [ $(($(stat -f -c '%a' /nix/store) * $(stat -f -c '%S' /nix/store))) -lt $((${toString cfg.minimumDiskFreeEvaluator} * 1024**3)) ]; then + echo "stopping Hydra evaluator due to lack of free space..." systemctl stop hydra-evaluator fi ''; - in - [ "*/5 * * * * root ${checkSpace} &> ${baseDir}/data/checkspace.log" - ]; + startAt = "*:0/5"; + }; + + services.postgresql.enable = mkIf haveLocalDB true; + + services.postgresql.identMap = optionalString haveLocalDB + '' + hydra-users hydra hydra + hydra-users hydra-queue-runner hydra + hydra-users hydra-www hydra + hydra-users root hydra + ''; + + services.postgresql.authentication = optionalString haveLocalDB + '' + local hydra all ident map=hydra-users + ''; + }; + } diff --git a/release.nix b/release.nix index 4668efc3..b4685d26 100644 --- a/release.nix +++ b/release.nix @@ -13,6 +13,7 @@ let { imports = [ ./hydra-module.nix ]; virtualisation.memorySize = 1024; + virtualisation.writableStore = true; services.hydra.enable = true; services.hydra.package = hydraPkg; @@ -32,12 +33,12 @@ in rec { releaseTools.makeSourceTarball { name = "hydra-tarball"; - src = hydraSrc; + src = if lib.inNixShell then null else hydraSrc; inherit officialRelease; version = builtins.readFile ./version; buildInputs = - [ perl libxslt dblatex tetex nukeReferences pkgconfig nixUnstable git openssl ]; + [ perl libxslt nukeReferences pkgconfig nix git openssl ]; versionSuffix = if officialRelease then "" else "pre${toString hydraSrc.revCount}-${hydraSrc.gitTag}"; @@ -47,6 +48,7 @@ in rec { addToSearchPath PATH $(pwd)/src/script addToSearchPath PATH $(pwd)/src/hydra-eval-jobs + addToSearchPath PATH $(pwd)/src/hydra-queue-runner addToSearchPath PERL5LIB $(pwd)/src/lib ''; @@ -62,12 +64,9 @@ in rec { postDist = '' make -C doc/manual install prefix="$out" - nuke-refs "$out/share/doc/hydra/manual.pdf" echo "doc manual $out/share/doc/hydra manual.html" >> \ "$out/nix-support/hydra-build-products" - echo "doc-pdf manual $out/share/doc/hydra/manual.pdf" >> \ - "$out/nix-support/hydra-build-products" ''; }; @@ -80,6 +79,18 @@ in rec { nix = nixUnstable; + NetStatsd = buildPerlPackage { + name = "Net-Statsd-0.11"; + src = fetchurl { + url = mirror://cpan/authors/id/C/CO/COSIMO/Net-Statsd-0.11.tar.gz; + sha256 = "0f56c95846c7e65e6d32cec13ab9df65716429141f106d2dc587f1de1e09e163"; + }; + meta = { + description = "Sends statistics to the stats daemon over UDP"; + license = "perl"; + }; + }; + perlDeps = buildEnv { name = "hydra-perl-deps"; paths = with perlPackages; @@ -115,6 +126,7 @@ in rec { LWP LWPProtocolHttps NetAmazonS3 + NetStatsd PadWalker Readonly SQLSplitStatement @@ -136,15 +148,16 @@ in rec { src = tarball; buildInputs = - [ makeWrapper libtool unzip nukeReferences pkgconfig sqlite + [ makeWrapper libtool unzip nukeReferences pkgconfig sqlite libpqxx gitAndTools.topGit mercurial darcs subversion bazaar openssl bzip2 guile # optional, for Guile + Guix support perlDeps perl + postgresql92 # for running the tests ]; hydraPath = lib.makeSearchPath "bin" ( [ libxslt sqlite subversion openssh nix coreutils findutils - gzip bzip2 lzma gnutar unzip git gitAndTools.topGit mercurial darcs gnused graphviz bazaar + gzip bzip2 lzma gnutar unzip git gitAndTools.topGit mercurial darcs gnused bazaar ] ++ lib.optionals stdenv.isLinux [ rpm dpkg cdrkit ] ); preCheck = '' @@ -154,7 +167,6 @@ in rec { postInstall = '' mkdir -p $out/nix-support - nuke-refs $out/share/doc/hydra/manual/manual.pdf for i in $out/bin/*; do wrapProgram $i \ @@ -205,9 +217,8 @@ in rec { , "chown -R hydra /run/jobset /tmp/nix" ); - # Start the web interface with some weird settings. - $machine->succeed("systemctl stop hydra-server hydra-evaluator hydra-queue-runner"); - $machine->mustSucceed("su - hydra -c 'NIX_STORE_DIR=/tmp/nix/store NIX_LOG_DIR=/tmp/nix/var/log/nix NIX_STATE_DIR=/tmp/nix/var/nix NIX_REMOTE= DBIC_TRACE=1 hydra-server -d' >&2 &"); + $machine->succeed("systemctl stop hydra-evaluator hydra-queue-runner"); + $machine->waitForJob("hydra-server"); $machine->waitForOpenPort("3000"); # Run the API tests. diff --git a/src/Makefile.am b/src/Makefile.am index d91a1daa..a1936113 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1,3 +1,3 @@ -SUBDIRS = hydra-eval-jobs sql script lib root xsl ttf +SUBDIRS = hydra-eval-jobs hydra-queue-runner sql script lib root xsl ttf BOOTCLEAN_SUBDIRS = $(SUBDIRS) DIST_SUBDIRS = $(SUBDIRS) diff --git a/src/hydra-eval-jobs/hydra-eval-jobs.cc b/src/hydra-eval-jobs/hydra-eval-jobs.cc index 1f2c15c9..7fadef71 100644 --- a/src/hydra-eval-jobs/hydra-eval-jobs.cc +++ b/src/hydra-eval-jobs/hydra-eval-jobs.cc @@ -55,12 +55,12 @@ static void tryJobAlts(EvalState & state, JSONObject & top, } int n = 0; - foreach (ValueList::const_iterator, i, a->second) { + for (auto & i : a->second) { Bindings & actualArgs2(*state.allocBindings(actualArgs.size() + 1)); // !!! inefficient - for (auto & i: actualArgs) - actualArgs2.push_back(i); + for (auto & j : actualArgs) + actualArgs2.push_back(j); AutoArgs argsLeft2(argsLeft); - actualArgs2.push_back(Attr(cur->name, *i)); + actualArgs2.push_back(Attr(cur->name, i)); actualArgs2.sort(); // !!! inefficient argsLeft2.erase(cur->name); tryJobAlts(state, top, argsLeft2, attrPath, fun, next, last, actualArgs2); @@ -76,10 +76,10 @@ static string queryMetaStrings(EvalState & state, DrvInfo & drv, const string & state.forceValue(*v); if (v->type == tString) return v->string.s; - else if (v->type == tList) { + else if (v->isList()) { string res = ""; - for (unsigned int n = 0; n < v->list.length; ++n) { - Value v2(*v->list.elems[n]); + for (unsigned int n = 0; n < v->listSize(); ++n) { + Value v2(*v->listElems()[n]); state.forceValue(v2); if (v2.type == tString) { if (res.size() != 0) res += ", "; @@ -137,10 +137,10 @@ static void findJobsWrapped(EvalState & state, JSONObject & top, PathSet context; state.coerceToString(*a->pos, *a->value, context, true, false); PathSet drvs; - foreach (PathSet::iterator, i, context) - if (i->at(0) == '!') { - size_t index = i->find("!", 1); - drvs.insert(string(*i, index + 1)); + for (auto & i : context) + if (i.at(0) == '!') { + size_t index = i.find("!", 1); + drvs.insert(string(i, index + 1)); } res.attr("constituents", concatStringsSep(" ", drvs)); } @@ -164,9 +164,9 @@ static void findJobsWrapped(EvalState & state, JSONObject & top, else { if (!state.isDerivation(v)) { - foreach (Bindings::iterator, i, *v.attrs) - findJobs(state, top, argsLeft, *i->value, - (attrPath.empty() ? "" : attrPath + ".") + (string) i->name); + for (auto & i : *v.attrs) + findJobs(state, top, argsLeft, *i.value, + (attrPath.empty() ? "" : attrPath + ".") + (string) i.name); } } } diff --git a/src/hydra-queue-runner/Makefile.am b/src/hydra-queue-runner/Makefile.am new file mode 100644 index 00000000..089187d1 --- /dev/null +++ b/src/hydra-queue-runner/Makefile.am @@ -0,0 +1,8 @@ +bin_PROGRAMS = hydra-queue-runner + +hydra_queue_runner_SOURCES = hydra-queue-runner.cc queue-monitor.cc dispatcher.cc \ + builder.cc build-result.cc build-remote.cc \ + build-result.hh counter.hh pool.hh sync.hh token-server.hh state.hh db.hh +hydra_queue_runner_LDADD = $(NIX_LIBS) -lpqxx + +AM_CXXFLAGS = $(NIX_CFLAGS) -Wall diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc new file mode 100644 index 00000000..92a6fb28 --- /dev/null +++ b/src/hydra-queue-runner/build-remote.cc @@ -0,0 +1,281 @@ +#include + +#include +#include +#include + +#include "misc.hh" +#include "serve-protocol.hh" +#include "state.hh" +#include "util.hh" +#include "worker-protocol.hh" + +using namespace nix; + + +struct Child +{ + Pid pid; + AutoCloseFD to, from; +}; + + +static void append(Strings & dst, const Strings & src) +{ + dst.insert(dst.end(), src.begin(), src.end()); +} + + +static void openConnection(const string & sshName, const string & sshKey, + int stderrFD, Child & child) +{ + Pipe to, from; + to.create(); + from.create(); + + child.pid = startProcess([&]() { + + if (dup2(to.readSide, STDIN_FILENO) == -1) + throw SysError("cannot dup input pipe to stdin"); + + if (dup2(from.writeSide, STDOUT_FILENO) == -1) + throw SysError("cannot dup output pipe to stdout"); + + if (dup2(stderrFD, STDERR_FILENO) == -1) + throw SysError("cannot dup stderr"); + + Strings argv; + if (sshName == "localhost") + argv = {"nix-store", "--serve", "--write"}; + else { + argv = {"ssh", sshName}; + if (sshKey != "" && sshKey != "-") append(argv, {"-i", sshKey}); + append(argv, + { "-x", "-a", "-oBatchMode=yes", "-oConnectTimeout=60", "-oTCPKeepAlive=yes" + , "--", "nix-store", "--serve", "--write" }); + } + + execvp(argv.front().c_str(), (char * *) stringsToCharPtrs(argv).data()); // FIXME: remove cast + + throw SysError("cannot start ssh"); + }); + + to.readSide.close(); + from.writeSide.close(); + + child.to = to.writeSide.borrow(); + child.from = from.readSide.borrow(); +} + + +static void copyClosureTo(std::shared_ptr store, + FdSource & from, FdSink & to, const PathSet & paths, + counter & bytesSent, + bool useSubstitutes = false) +{ + PathSet closure; + for (auto & path : paths) + computeFSClosure(*store, path, closure); + + /* Send the "query valid paths" command with the "lock" option + enabled. This prevents a race where the remote host + garbage-collect paths that are already there. Optionally, ask + the remote host to substitute missing paths. */ + to << cmdQueryValidPaths << 1 << useSubstitutes << closure; + to.flush(); + + /* Get back the set of paths that are already valid on the remote + host. */ + auto present = readStorePaths(from); + + if (present.size() == closure.size()) return; + + Paths sorted = topoSortPaths(*store, closure); + + Paths missing; + for (auto i = sorted.rbegin(); i != sorted.rend(); ++i) + if (present.find(*i) == present.end()) missing.push_back(*i); + + printMsg(lvlDebug, format("sending %1% missing paths") % missing.size()); + + for (auto & p : missing) + bytesSent += store->queryPathInfo(p).narSize; + + to << cmdImportPaths; + exportPaths(*store, missing, false, to); + to.flush(); + + if (readInt(from) != 1) + throw Error("remote machine failed to import closure"); +} + + +static void copyClosureFrom(std::shared_ptr store, + FdSource & from, FdSink & to, const PathSet & paths, counter & bytesReceived) +{ + to << cmdExportPaths << 0 << paths; + to.flush(); + store->importPaths(false, from); + + for (auto & p : paths) + bytesReceived += store->queryPathInfo(p).narSize; +} + + +void State::buildRemote(std::shared_ptr store, + Machine::ptr machine, Step::ptr step, + unsigned int maxSilentTime, unsigned int buildTimeout, + RemoteResult & result) +{ + string base = baseNameOf(step->drvPath); + result.logFile = logDir + "/" + string(base, 0, 2) + "/" + string(base, 2); + AutoDelete autoDelete(result.logFile, false); + + createDirs(dirOf(result.logFile)); + + AutoCloseFD logFD(open(result.logFile.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0666)); + if (logFD == -1) throw SysError(format("creating log file ‘%1%’") % result.logFile); + + Child child; + openConnection(machine->sshName, machine->sshKey, logFD, child); + + logFD.close(); + + FdSource from(child.from); + FdSink to(child.to); + + /* Handshake. */ + bool sendDerivation = true; + try { + to << SERVE_MAGIC_1 << SERVE_PROTOCOL_VERSION; + to.flush(); + + unsigned int magic = readInt(from); + if (magic != SERVE_MAGIC_2) + throw Error(format("protocol mismatch with ‘nix-store --serve’ on ‘%1%’") % machine->sshName); + unsigned int version = readInt(from); + if (GET_PROTOCOL_MAJOR(version) != 0x200) + throw Error(format("unsupported ‘nix-store --serve’ protocol version on ‘%1%’") % machine->sshName); + if (GET_PROTOCOL_MINOR(version) >= 1 && machine->sshName != "localhost") // FIXME + sendDerivation = false; + + } catch (EndOfFile & e) { + child.pid.wait(true); + + { + /* Disable this machine until a certain period of time has + passed. This period increases on every consecutive + failure. However, don't count failures that occurred + soon after the last one (to take into account steps + started in parallel). */ + auto info(machine->state->connectInfo.lock()); + auto now = std::chrono::system_clock::now(); + if (info->consecutiveFailures == 0 || info->lastFailure < now - std::chrono::seconds(30)) { + info->consecutiveFailures = std::min(info->consecutiveFailures + 1, (unsigned int) 4); + info->lastFailure = now; + int delta = retryInterval * powf(retryBackoff, info->consecutiveFailures - 1) + (rand() % 30); + printMsg(lvlInfo, format("will disable machine ‘%1%’ for %2%s") % machine->sshName % delta); + info->disabledUntil = now + std::chrono::seconds(delta); + } + } + + string s = chomp(readFile(result.logFile)); + throw Error(format("cannot connect to ‘%1%’: %2%") % machine->sshName % s); + } + + { + auto info(machine->state->connectInfo.lock()); + info->consecutiveFailures = 0; + } + + /* Gather the inputs. If the remote side is Nix <= 1.9, we have to + copy the entire closure of ‘drvPath’, as well as the required + outputs of the input derivations. On Nix > 1.9, we only need to + copy the immediate sources of the derivation and the required + outputs of the input derivations. */ + PathSet inputs; + BasicDerivation basicDrv(step->drv); + + if (sendDerivation) + inputs.insert(step->drvPath); + else + for (auto & p : step->drv.inputSrcs) + inputs.insert(p); + + for (auto & input : step->drv.inputDrvs) { + Derivation drv2 = readDerivation(input.first); + for (auto & name : input.second) { + auto i = drv2.outputs.find(name); + if (i == drv2.outputs.end()) continue; + inputs.insert(i->second.path); + basicDrv.inputSrcs.insert(i->second.path); + } + } + + /* Copy the input closure. */ + if (machine->sshName != "localhost") { + auto mc1 = std::make_shared(nrStepsWaiting); + std::lock_guard sendLock(machine->state->sendLock); + mc1.reset(); + MaintainCount mc2(nrStepsCopyingTo); + printMsg(lvlDebug, format("sending closure of ‘%1%’ to ‘%2%’") % step->drvPath % machine->sshName); + copyClosureTo(store, from, to, inputs, bytesSent); + } + + autoDelete.cancel(); + + /* Do the build. */ + printMsg(lvlDebug, format("building ‘%1%’ on ‘%2%’") % step->drvPath % machine->sshName); + + if (sendDerivation) + to << cmdBuildPaths << PathSet({step->drvPath}) << maxSilentTime << buildTimeout; + else + to << cmdBuildDerivation << step->drvPath << basicDrv << maxSilentTime << buildTimeout; + // FIXME: send maxLogSize. + to.flush(); + + result.startTime = time(0); + int res; + { + MaintainCount mc(nrStepsBuilding); + res = readInt(from); + } + result.stopTime = time(0); + + if (sendDerivation) { + if (res) { + result.errorMsg = (format("%1% on ‘%2%’") % readString(from) % machine->sshName).str(); + if (res == 100) result.status = BuildResult::PermanentFailure; + else if (res == 101) result.status = BuildResult::TimedOut; + else result.status = BuildResult::MiscFailure; + return; + } + result.status = BuildResult::Built; + } else { + result.status = (BuildResult::Status) res; + result.errorMsg = readString(from); + if (!result.success()) return; + } + + /* If the path was substituted or already valid, then we didn't + get a build log. */ + if (result.status == BuildResult::Substituted || result.status == BuildResult::AlreadyValid) { + unlink(result.logFile.c_str()); + result.logFile = ""; + } + + /* Copy the output paths. */ + if (machine->sshName != "localhost") { + printMsg(lvlDebug, format("copying outputs of ‘%1%’ from ‘%2%’") % step->drvPath % machine->sshName); + PathSet outputs; + for (auto & output : step->drv.outputs) + outputs.insert(output.second.path); + MaintainCount mc(nrStepsCopyingFrom); + copyClosureFrom(store, from, to, outputs, bytesReceived); + } + + /* Shut down the connection. */ + child.to.close(); + child.pid.wait(true); + +} diff --git a/src/hydra-queue-runner/build-result.cc b/src/hydra-queue-runner/build-result.cc new file mode 100644 index 00000000..5f516d23 --- /dev/null +++ b/src/hydra-queue-runner/build-result.cc @@ -0,0 +1,145 @@ +#include "build-result.hh" +#include "store-api.hh" +#include "misc.hh" +#include "util.hh" +#include "regex.hh" + +using namespace nix; + + +static std::tuple secureRead(Path fileName) +{ + auto fail = std::make_tuple(false, ""); + + if (!pathExists(fileName)) return fail; + + try { + /* For security, resolve symlinks. */ + fileName = canonPath(fileName, true); + if (!isInStore(fileName)) return fail; + return std::make_tuple(true, readFile(fileName)); + } catch (Error & e) { return fail; } +} + + +BuildOutput getBuildOutput(std::shared_ptr store, const Derivation & drv) +{ + BuildOutput res; + + /* Compute the closure size. */ + PathSet outputs; + for (auto & output : drv.outputs) + outputs.insert(output.second.path); + PathSet closure; + for (auto & output : outputs) + computeFSClosure(*store, output, closure); + for (auto & path : closure) { + auto info = store->queryPathInfo(path); + res.closureSize += info.narSize; + if (outputs.find(path) != outputs.end()) res.size += info.narSize; + } + + /* Get build products. */ + bool explicitProducts = false; + + Regex regex( + "(([a-zA-Z0-9_-]+)" // type (e.g. "doc") + "[[:space:]]+" + "([a-zA-Z0-9_-]+)" // subtype (e.g. "readme") + "[[:space:]]+" + "(\"[^\"]+\"|[^[:space:]\"]+))" // path (may be quoted) + "([[:space:]]+([^[:space:]]+))?" // entry point + , true); + + for (auto & output : outputs) { + Path failedFile = output + "/nix-support/failed"; + if (pathExists(failedFile)) res.failed = true; + + auto file = secureRead(output + "/nix-support/hydra-build-products"); + if (!std::get<0>(file)) continue; + + explicitProducts = true; + + for (auto & line : tokenizeString(std::get<1>(file), "\n")) { + BuildProduct product; + + Regex::Subs subs; + if (!regex.matches(line, subs)) continue; + + product.type = subs[1]; + product.subtype = subs[2]; + product.path = subs[3][0] == '"' ? string(subs[3], 1, subs[3].size() - 2) : subs[3]; + product.defaultPath = subs[5]; + + /* Ensure that the path exists and points into the Nix + store. */ + if (product.path == "" || product.path[0] != '/') continue; + try { + product.path = canonPath(product.path, true); + } catch (Error & e) { continue; } + if (!isInStore(product.path) || !pathExists(product.path)) continue; + + /* FIXME: check that the path is in the input closure + of the build? */ + + product.name = product.path == output ? "" : baseNameOf(product.path); + + struct stat st; + if (stat(product.path.c_str(), &st)) + throw SysError(format("getting status of ‘%1%’") % product.path); + + if (S_ISREG(st.st_mode)) { + product.isRegular = true; + product.fileSize = st.st_size; + product.sha1hash = hashFile(htSHA1, product.path); + product.sha256hash = hashFile(htSHA256, product.path); + } + + res.products.push_back(product); + } + } + + /* If no build products were explicitly declared, then add all + outputs as a product of type "nix-build". */ + if (!explicitProducts) { + for (auto & output : drv.outputs) { + BuildProduct product; + product.path = output.second.path; + product.type = "nix-build"; + product.subtype = output.first == "out" ? "" : output.first; + product.name = storePathToName(product.path); + + struct stat st; + if (stat(product.path.c_str(), &st)) + throw SysError(format("getting status of ‘%1%’") % product.path); + if (S_ISDIR(st.st_mode)) + res.products.push_back(product); + } + } + + /* Get the release name from $output/nix-support/hydra-release-name. */ + for (auto & output : outputs) { + Path p = output + "/nix-support/hydra-release-name"; + if (!pathExists(p)) continue; + try { + res.releaseName = trim(readFile(p)); + } catch (Error & e) { continue; } + // FIXME: validate release name + } + + /* Get metrics. */ + for (auto & output : outputs) { + auto file = secureRead(output + "/nix-support/hydra-metrics"); + for (auto & line : tokenizeString(std::get<1>(file), "\n")) { + auto fields = tokenizeString>(line); + if (fields.size() < 2) continue; + BuildMetric metric; + metric.name = fields[0]; // FIXME: validate + metric.value = atof(fields[1].c_str()); // FIXME + metric.unit = fields.size() >= 3 ? fields[2] : ""; + res.metrics[metric.name] = metric; + } + } + + return res; +} diff --git a/src/hydra-queue-runner/build-result.hh b/src/hydra-queue-runner/build-result.hh new file mode 100644 index 00000000..e40e7dcd --- /dev/null +++ b/src/hydra-queue-runner/build-result.hh @@ -0,0 +1,40 @@ +#pragma once + +#include + +#include "hash.hh" +#include "derivations.hh" + +struct BuildProduct +{ + nix::Path path, defaultPath; + std::string type, subtype, name; + bool isRegular = false; + nix::Hash sha1hash, sha256hash; + off_t fileSize = 0; + BuildProduct() { } +}; + +struct BuildMetric +{ + std::string name, unit; + double value; +}; + +struct BuildOutput +{ + /* Whether this build has failed with output, i.e., the build + finished with exit code 0 but produced a file + $out/nix-support/failed. */ + bool failed = false; + + std::string releaseName; + + unsigned long long closureSize = 0, size = 0; + + std::list products; + + std::map metrics; +}; + +BuildOutput getBuildOutput(std::shared_ptr store, const nix::Derivation & drv); diff --git a/src/hydra-queue-runner/builder.cc b/src/hydra-queue-runner/builder.cc new file mode 100644 index 00000000..2867f1f0 --- /dev/null +++ b/src/hydra-queue-runner/builder.cc @@ -0,0 +1,388 @@ +#include + +#include "state.hh" +#include "build-result.hh" + +using namespace nix; + + +void State::builder(Step::ptr step, Machine::ptr machine, std::shared_ptr reservation) +{ + bool retry = true; + + MaintainCount mc(nrActiveSteps); + + try { + auto store = openStore(); // FIXME: pool + retry = doBuildStep(store, step, machine); + } catch (std::exception & e) { + printMsg(lvlError, format("uncaught exception building ‘%1%’ on ‘%2%’: %3%") + % step->drvPath % machine->sshName % e.what()); + } + + /* Release the machine and wake up the dispatcher. */ + assert(reservation.unique()); + reservation = 0; + wakeDispatcher(); + + /* If there was a temporary failure, retry the step after an + exponentially increasing interval. */ + if (retry) { + { + auto step_(step->state.lock()); + step_->tries++; + nrRetries++; + if (step_->tries > maxNrRetries) maxNrRetries = step_->tries; // yeah yeah, not atomic + int delta = retryInterval * powf(retryBackoff, step_->tries - 1) + (rand() % 10); + printMsg(lvlInfo, format("will retry ‘%1%’ after %2%s") % step->drvPath % delta); + step_->after = std::chrono::system_clock::now() + std::chrono::seconds(delta); + } + + makeRunnable(step); + } +} + + +bool State::doBuildStep(std::shared_ptr store, Step::ptr step, + Machine::ptr machine) +{ + { + auto step_(step->state.lock()); + assert(step_->created); + assert(!step->finished); + } + + /* There can be any number of builds in the database that depend + on this derivation. Arbitrarily pick one (though preferring a + build of which this is the top-level derivation) for the + purpose of creating build steps. We could create a build step + record for every build, but that could be very expensive + (e.g. a stdenv derivation can be a dependency of tens of + thousands of builds), so we don't. */ + Build::ptr build; + + { + std::set dependents; + std::set steps; + getDependents(step, dependents, steps); + + if (dependents.empty()) { + /* Apparently all builds that depend on this derivation + are gone (e.g. cancelled). So don't bother. This is + very unlikely to happen, because normally Steps are + only kept alive by being reachable from a + Build. However, it's possible that a new Build just + created a reference to this step. So to handle that + possibility, we retry this step (putting it back in + the runnable queue). If there are really no strong + pointers to the step, it will be deleted. */ + printMsg(lvlInfo, format("maybe cancelling build step ‘%1%’") % step->drvPath); + return true; + } + + for (auto build2 : dependents) + if (build2->drvPath == step->drvPath) { build = build2; break; } + + if (!build) build = *dependents.begin(); + + printMsg(lvlInfo, format("performing step ‘%1%’ on ‘%2%’ (needed by build %3% and %4% others)") + % step->drvPath % machine->sshName % build->id % (dependents.size() - 1)); + } + + bool quit = build->id == buildOne; + + auto conn(dbPool.get()); + + RemoteResult result; + BuildOutput res; + int stepNr = 0; + + time_t stepStartTime = result.startTime = time(0); + + /* If any of the outputs have previously failed, then don't bother + building again. */ + bool cachedFailure = checkCachedFailure(step, *conn); + + if (cachedFailure) + result.status = BuildResult::CachedFailure; + else { + + /* Create a build step record indicating that we started + building. Also, mark the selected build as busy. */ + { + pqxx::work txn(*conn); + stepNr = createBuildStep(txn, result.startTime, build, step, machine->sshName, bssBusy); + txn.parameterized("update Builds set busy = 1 where id = $1")(build->id).exec(); + txn.commit(); + } + + /* Do the build. */ + try { + /* FIXME: referring builds may have conflicting timeouts. */ + buildRemote(store, machine, step, build->maxSilentTime, build->buildTimeout, result); + } catch (Error & e) { + result.status = BuildResult::MiscFailure; + result.errorMsg = e.msg(); + } + + if (result.success()) res = getBuildOutput(store, step->drv); + } + + time_t stepStopTime = time(0); + if (!result.stopTime) result.stopTime = stepStopTime; + + /* Account the time we spent building this step by dividing it + among the jobsets that depend on it. */ + { + auto step_(step->state.lock()); + // FIXME: loss of precision. + time_t charge = (result.stopTime - result.startTime) / step_->jobsets.size(); + for (auto & jobset : step_->jobsets) + jobset->addStep(result.startTime, charge); + } + + /* Asynchronously compress the log. */ + if (result.logFile != "") { + { + auto logCompressorQueue_(logCompressorQueue.lock()); + logCompressorQueue_->push(result.logFile); + } + logCompressorWakeup.notify_one(); + } + + /* The step had a hopefully temporary failure (e.g. network + issue). Retry a number of times. */ + if (result.canRetry()) { + printMsg(lvlError, format("possibly transient failure building ‘%1%’ on ‘%2%’: %3%") + % step->drvPath % machine->sshName % result.errorMsg); + bool retry; + { + auto step_(step->state.lock()); + retry = step_->tries + 1 < maxTries; + } + if (retry) { + pqxx::work txn(*conn); + finishBuildStep(txn, result.startTime, result.stopTime, build->id, + stepNr, machine->sshName, bssAborted, result.errorMsg); + txn.commit(); + if (quit) exit(1); + return true; + } + } + + if (result.success()) { + + /* Register success in the database for all Build objects that + have this step as the top-level step. Since the queue + monitor thread may be creating new referring Builds + concurrently, and updating the database may fail, we do + this in a loop, marking all known builds, repeating until + there are no unmarked builds. + */ + + std::vector buildIDs; + + while (true) { + + /* Get the builds that have this one as the top-level. */ + std::vector direct; + { + auto steps_(steps.lock()); + auto step_(step->state.lock()); + + for (auto & b_ : step_->builds) { + auto b = b_.lock(); + if (b && !b->finishedInDB) direct.push_back(b); + } + + /* If there are no builds left to update in the DB, + then we're done (except for calling + finishBuildStep()). Delete the step from + ‘steps’. Since we've been holding the ‘steps’ lock, + no new referrers can have been added in the + meantime or be added afterwards. */ + if (direct.empty()) { + printMsg(lvlDebug, format("finishing build step ‘%1%’") % step->drvPath); + steps_->erase(step->drvPath); + } + } + + /* Update the database. */ + { + pqxx::work txn(*conn); + + finishBuildStep(txn, result.startTime, result.stopTime, build->id, stepNr, machine->sshName, bssSuccess); + + for (auto & b : direct) + markSucceededBuild(txn, b, res, build != b || result.status != BuildResult::Built, + result.startTime, result.stopTime); + + txn.commit(); + } + + if (direct.empty()) break; + + /* Remove the direct dependencies from ‘builds’. This will + cause them to be destroyed. */ + for (auto & b : direct) { + auto builds_(builds.lock()); + b->finishedInDB = true; + builds_->erase(b->id); + buildIDs.push_back(b->id); + } + } + + /* Send notification about the builds that have this step as + the top-level. */ + for (auto id : buildIDs) { + { + auto notificationSenderQueue_(notificationSenderQueue.lock()); + notificationSenderQueue_->push(NotificationItem(id, std::vector())); + } + notificationSenderWakeup.notify_one(); + } + + /* Wake up any dependent steps that have no other + dependencies. */ + { + auto step_(step->state.lock()); + for (auto & rdepWeak : step_->rdeps) { + auto rdep = rdepWeak.lock(); + if (!rdep) continue; + + bool runnable = false; + { + auto rdep_(rdep->state.lock()); + rdep_->deps.erase(step); + /* Note: if the step has not finished + initialisation yet, it will be made runnable in + createStep(), if appropriate. */ + if (rdep_->deps.empty() && rdep_->created) runnable = true; + } + + if (runnable) makeRunnable(rdep); + } + } + + } else { + + /* Register failure in the database for all Build objects that + directly or indirectly depend on this step. */ + + std::vector dependentIDs; + + while (true) { + + /* Get the builds and steps that depend on this step. */ + std::set indirect; + { + auto steps_(steps.lock()); + std::set steps; + getDependents(step, indirect, steps); + + /* If there are no builds left, delete all referring + steps from ‘steps’. As for the success case, we can + be certain no new referrers can be added. */ + if (indirect.empty()) { + for (auto & s : steps) { + printMsg(lvlDebug, format("finishing build step ‘%1%’") % s->drvPath); + steps_->erase(s->drvPath); + } + break; + } + } + + /* Update the database. */ + { + pqxx::work txn(*conn); + + BuildStatus buildStatus = + result.status == BuildResult::TimedOut ? bsTimedOut : + result.canRetry() ? bsAborted : + bsFailed; + BuildStepStatus buildStepStatus = + result.status == BuildResult::TimedOut ? bssTimedOut : + result.canRetry() ? bssAborted : + bssFailed; + + /* For standard failures, we don't care about the error + message. */ + if (result.status == BuildResult::PermanentFailure || + result.status == BuildResult::TransientFailure || + result.status == BuildResult::CachedFailure || + result.status == BuildResult::TimedOut) + result.errorMsg = ""; + + /* Create failed build steps for every build that depends + on this. For cached failures, only create a step for + builds that don't have this step as top-level + (otherwise the user won't be able to see what caused + the build to fail). */ + for (auto & build2 : indirect) { + if ((cachedFailure && build2->drvPath == step->drvPath) || + (!cachedFailure && build == build2) || + build2->finishedInDB) + continue; + createBuildStep(txn, 0, build2, step, machine->sshName, + buildStepStatus, result.errorMsg, build == build2 ? 0 : build->id); + } + + if (!cachedFailure) + finishBuildStep(txn, result.startTime, result.stopTime, build->id, + stepNr, machine->sshName, buildStepStatus, result.errorMsg); + + /* Mark all builds that depend on this derivation as failed. */ + for (auto & build2 : indirect) { + if (build2->finishedInDB) continue; + printMsg(lvlError, format("marking build %1% as failed") % build2->id); + txn.parameterized + ("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $4, isCachedBuild = $5 where id = $1 and finished = 0") + (build2->id) + ((int) (build2->drvPath != step->drvPath && buildStatus == bsFailed ? bsDepFailed : buildStatus)) + (result.startTime) + (result.stopTime) + (cachedFailure ? 1 : 0).exec(); + nrBuildsDone++; + } + + /* Remember failed paths in the database so that they + won't be built again. */ + if (!cachedFailure && result.status == BuildResult::PermanentFailure) + for (auto & path : outputPaths(step->drv)) + txn.parameterized("insert into FailedPaths values ($1)")(path).exec(); + + txn.commit(); + } + + /* Remove the indirect dependencies from ‘builds’. This + will cause them to be destroyed. */ + for (auto & b : indirect) { + auto builds_(builds.lock()); + b->finishedInDB = true; + builds_->erase(b->id); + dependentIDs.push_back(b->id); + if (buildOne == b->id) quit = true; + } + } + + /* Send notification about this build and its dependents. */ + { + auto notificationSenderQueue_(notificationSenderQueue.lock()); + notificationSenderQueue_->push(NotificationItem(build->id, dependentIDs)); + } + notificationSenderWakeup.notify_one(); + + } + + // FIXME: keep stats about aborted steps? + nrStepsDone++; + totalStepTime += stepStopTime - stepStartTime; + totalStepBuildTime += result.stopTime - result.startTime; + machine->state->nrStepsDone++; + machine->state->totalStepTime += stepStopTime - stepStartTime; + machine->state->totalStepBuildTime += result.stopTime - result.startTime; + + if (quit) exit(0); // testing hack + + return false; +} diff --git a/src/hydra-queue-runner/counter.hh b/src/hydra-queue-runner/counter.hh new file mode 100644 index 00000000..912cb499 --- /dev/null +++ b/src/hydra-queue-runner/counter.hh @@ -0,0 +1,12 @@ +#pragma once + +#include + +typedef std::atomic counter; + +struct MaintainCount +{ + counter & c; + MaintainCount(counter & c) : c(c) { c++; } + ~MaintainCount() { c--; } +}; diff --git a/src/hydra-queue-runner/db.hh b/src/hydra-queue-runner/db.hh new file mode 100644 index 00000000..9b5b7348 --- /dev/null +++ b/src/hydra-queue-runner/db.hh @@ -0,0 +1,38 @@ +#pragma once + +#include + +#include "util.hh" + + +struct Connection : pqxx::connection +{ + Connection() : pqxx::connection(getFlags()) { }; + + std::string getFlags() + { + using namespace nix; + auto s = getEnv("HYDRA_DBI", "dbi:Pg:dbname=hydra;"); + std::string prefix = "dbi:Pg:"; + if (std::string(s, 0, prefix.size()) != prefix) + throw Error("$HYDRA_DBI does not denote a PostgreSQL database"); + return concatStringsSep(" ", tokenizeString(string(s, prefix.size()), ";")); + } +}; + + +struct receiver : public pqxx::notification_receiver +{ + bool status = false; + receiver(pqxx::connection_base & c, const std::string & channel) + : pqxx::notification_receiver(c, channel) { } + void operator() (const std::string & payload, int pid) override + { + status = true; + }; + bool get() { + bool b = status; + status = false; + return b; + } +}; diff --git a/src/hydra-queue-runner/dispatcher.cc b/src/hydra-queue-runner/dispatcher.cc new file mode 100644 index 00000000..bb50948f --- /dev/null +++ b/src/hydra-queue-runner/dispatcher.cc @@ -0,0 +1,268 @@ +#include +#include + +#include "state.hh" + +using namespace nix; + + +void State::makeRunnable(Step::ptr step) +{ + printMsg(lvlChatty, format("step ‘%1%’ is now runnable") % step->drvPath); + + { + auto step_(step->state.lock()); + assert(step_->created); + assert(!step->finished); + assert(step_->deps.empty()); + } + + { + auto runnable_(runnable.lock()); + runnable_->push_back(step); + } + + wakeDispatcher(); +} + + +void State::dispatcher() +{ + while (true) { + printMsg(lvlDebug, "dispatcher woken up"); + + auto sleepUntil = doDispatch(); + + /* Sleep until we're woken up (either because a runnable build + is added, or because a build finishes). */ + { + auto dispatcherWakeup_(dispatcherWakeup.lock()); + if (!*dispatcherWakeup_) { + printMsg(lvlDebug, format("dispatcher sleeping for %1%s") % + std::chrono::duration_cast(sleepUntil - std::chrono::system_clock::now()).count()); + dispatcherWakeup_.wait_until(dispatcherWakeupCV, sleepUntil); + } + nrDispatcherWakeups++; + *dispatcherWakeup_ = false; + } + } + + printMsg(lvlError, "dispatcher exits"); +} + + +system_time State::doDispatch() +{ + /* Prune old historical build step info from the jobsets. */ + { + auto jobsets_(jobsets.lock()); + for (auto & jobset : *jobsets_) { + auto s1 = jobset.second->shareUsed(); + jobset.second->pruneSteps(); + auto s2 = jobset.second->shareUsed(); + if (s1 != s2) + printMsg(lvlDebug, format("pruned scheduling window of ‘%1%:%2%’ from %3% to %4%") + % jobset.first.first % jobset.first.second % s1 % s2); + } + } + + /* Start steps until we're out of steps or slots. */ + auto sleepUntil = system_time::max(); + bool keepGoing; + + do { + system_time now = std::chrono::system_clock::now(); + + /* Copy the currentJobs field of each machine. This is + necessary to ensure that the sort comparator below is + an ordering. std::sort() can segfault if it isn't. Also + filter out temporarily disabled machines. */ + struct MachineInfo + { + Machine::ptr machine; + unsigned int currentJobs; + }; + std::vector machinesSorted; + { + auto machines_(machines.lock()); + for (auto & m : *machines_) { + auto info(m.second->state->connectInfo.lock()); + if (info->consecutiveFailures && info->disabledUntil > now) { + if (info->disabledUntil < sleepUntil) + sleepUntil = info->disabledUntil; + continue; + } + machinesSorted.push_back({m.second, m.second->state->currentJobs}); + } + } + + /* Sort the machines by a combination of speed factor and + available slots. Prioritise the available machines as + follows: + + - First by load divided by speed factor, rounded to the + nearest integer. This causes fast machines to be + preferred over slow machines with similar loads. + + - Then by speed factor. + + - Finally by load. */ + sort(machinesSorted.begin(), machinesSorted.end(), + [](const MachineInfo & a, const MachineInfo & b) -> bool + { + float ta = roundf(a.currentJobs / a.machine->speedFactor); + float tb = roundf(b.currentJobs / b.machine->speedFactor); + return + ta != tb ? ta < tb : + a.machine->speedFactor != b.machine->speedFactor ? a.machine->speedFactor > b.machine->speedFactor : + a.currentJobs > b.currentJobs; + }); + + /* Sort the runnable steps by priority. Priority is establised + as follows (in order of precedence): + + - The global priority of the builds that depend on the + step. This allows admins to bump a build to the front of + the queue. + + - The lowest used scheduling share of the jobsets depending + on the step. + + - The local priority of the build, as set via the build's + meta.schedulingPriority field. Note that this is not + quite correct: the local priority should only be used to + establish priority between builds in the same jobset, but + here it's used between steps in different jobsets if they + happen to have the same lowest used scheduling share. But + that's not every likely. + + - The lowest ID of the builds depending on the step; + i.e. older builds take priority over new ones. + + FIXME: O(n lg n); obviously, it would be better to keep a + runnable queue sorted by priority. */ + std::vector runnableSorted; + { + auto runnable_(runnable.lock()); + runnableSorted.reserve(runnable_->size()); + for (auto i = runnable_->begin(); i != runnable_->end(); ) { + auto step = i->lock(); + + /* Remove dead steps. */ + if (!step) { + i = runnable_->erase(i); + continue; + } + + ++i; + + /* Skip previously failed steps that aren't ready + to be retried. */ + { + auto step_(step->state.lock()); + if (step_->tries > 0 && step_->after > now) { + if (step_->after < sleepUntil) + sleepUntil = step_->after; + continue; + } + } + + runnableSorted.push_back(step); + } + } + + for (auto & step : runnableSorted) { + auto step_(step->state.lock()); + step_->lowestShareUsed = 1e9; + for (auto & jobset : step_->jobsets) + step_->lowestShareUsed = std::min(step_->lowestShareUsed, jobset->shareUsed()); + } + + sort(runnableSorted.begin(), runnableSorted.end(), + [](const Step::ptr & a, const Step::ptr & b) + { + auto a_(a->state.lock()); + auto b_(b->state.lock()); // FIXME: deadlock? + return + a_->highestGlobalPriority != b_->highestGlobalPriority ? a_->highestGlobalPriority > b_->highestGlobalPriority : + a_->lowestShareUsed != b_->lowestShareUsed ? a_->lowestShareUsed < b_->lowestShareUsed : + a_->highestLocalPriority != b_->highestLocalPriority ? a_->highestLocalPriority > b_->highestLocalPriority : + a_->lowestBuildID < b_->lowestBuildID; + }); + + /* Find a machine with a free slot and find a step to run + on it. Once we find such a pair, we restart the outer + loop because the machine sorting will have changed. */ + keepGoing = false; + + for (auto & mi : machinesSorted) { + if (mi.machine->state->currentJobs >= mi.machine->maxJobs) continue; + + for (auto & step : runnableSorted) { + + /* Can this machine do this step? */ + if (!mi.machine->supportsStep(step)) continue; + + /* Let's do this step. Remove it from the runnable + list. FIXME: O(n). */ + { + auto runnable_(runnable.lock()); + bool removed = false; + for (auto i = runnable_->begin(); i != runnable_->end(); ) + if (i->lock() == step) { + i = runnable_->erase(i); + removed = true; + break; + } else ++i; + assert(removed); + } + + /* Make a slot reservation and start a thread to + do the build. */ + auto reservation = std::make_shared(mi.machine->state->currentJobs); + + auto builderThread = std::thread(&State::builder, this, step, mi.machine, reservation); + builderThread.detach(); // FIXME? + + keepGoing = true; + break; + } + + if (keepGoing) break; + } + + } while (keepGoing); + + return sleepUntil; +} + + +void State::wakeDispatcher() +{ + { + auto dispatcherWakeup_(dispatcherWakeup.lock()); + *dispatcherWakeup_ = true; + } + dispatcherWakeupCV.notify_one(); +} + + +void Jobset::addStep(time_t startTime, time_t duration) +{ + auto steps_(steps.lock()); + (*steps_)[startTime] = duration; + seconds += duration; +} + + +void Jobset::pruneSteps() +{ + time_t now = time(0); + auto steps_(steps.lock()); + while (!steps_->empty()) { + auto i = steps_->begin(); + if (i->first > now - schedulingWindow) break; + seconds -= i->second; + steps_->erase(i); + } +} diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc new file mode 100644 index 00000000..3fe608fc --- /dev/null +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -0,0 +1,641 @@ +#include +#include + +#include +#include +#include + +#include "state.hh" +#include "build-result.hh" + +#include "shared.hh" +#include "globals.hh" +#include "value-to-json.hh" + +using namespace nix; + + +State::State() +{ + hydraData = getEnv("HYDRA_DATA"); + if (hydraData == "") throw Error("$HYDRA_DATA must be set"); + + logDir = canonPath(hydraData + "/build-logs"); + + machinesFile = getEnv("NIX_REMOTE_SYSTEMS", "/etc/nix/machines"); + machinesFileStat.st_ino = 0; + machinesFileStat.st_mtime = 0; + + localPlatforms = {settings.thisSystem}; + if (settings.thisSystem == "x86_64-linux") + localPlatforms.insert("i686-linux"); +} + + +void State::loadMachinesFile() +{ + string contents; + if (pathExists(machinesFile)) { + struct stat st; + if (stat(machinesFile.c_str(), &st) != 0) + throw SysError(format("getting stats about ‘%1%’") % machinesFile); + if (st.st_ino == machinesFileStat.st_ino && st.st_mtime == machinesFileStat.st_mtime) + return; + printMsg(lvlDebug, "reloading machines"); + contents = readFile(machinesFile); + machinesFileStat = st; + } else { + contents = "localhost " + concatStringsSep(",", localPlatforms) + + " - " + int2String(settings.maxBuildJobs) + " 1"; + } + + Machines newMachines, oldMachines; + { + auto machines_(machines.lock()); + oldMachines = *machines_; + } + + for (auto line : tokenizeString(contents, "\n")) { + line = trim(string(line, 0, line.find('#'))); + auto tokens = tokenizeString>(line); + if (tokens.size() < 3) continue; + tokens.resize(7); + + auto machine = std::make_shared(); + machine->sshName = tokens[0]; + machine->systemTypes = tokenizeString(tokens[1], ","); + machine->sshKey = tokens[2]; + if (tokens[3] != "") + string2Int(tokens[3], machine->maxJobs); + else + machine->maxJobs = 1; + machine->speedFactor = atof(tokens[4].c_str()); + if (tokens[5] == "-") tokens[5] = ""; + machine->supportedFeatures = tokenizeString(tokens[5], ","); + if (tokens[6] == "-") tokens[6] = ""; + machine->mandatoryFeatures = tokenizeString(tokens[6], ","); + for (auto & f : machine->mandatoryFeatures) + machine->supportedFeatures.insert(f); + + /* Re-use the State object of the previous machine with the + same name. */ + auto i = oldMachines.find(machine->sshName); + if (i == oldMachines.end()) + printMsg(lvlChatty, format("adding new machine ‘%1%’") % machine->sshName); + else + printMsg(lvlChatty, format("updating machine ‘%1%’") % machine->sshName); + machine->state = i == oldMachines.end() + ? std::make_shared() + : i->second->state; + newMachines[machine->sshName] = machine; + } + + for (auto & m : oldMachines) + if (newMachines.find(m.first) == newMachines.end()) + printMsg(lvlInfo, format("removing machine ‘%1%’") % m.first); + + auto machines_(machines.lock()); + *machines_ = newMachines; +} + + +void State::monitorMachinesFile() +{ + while (true) { + try { + // FIXME: use inotify. + sleep(60); + loadMachinesFile(); + } catch (std::exception & e) { + printMsg(lvlError, format("reloading machines file: %1%") % e.what()); + } + } +} + + +void State::clearBusy(Connection & conn, time_t stopTime) +{ + pqxx::work txn(conn); + txn.parameterized + ("update BuildSteps set busy = 0, status = $1, stopTime = $2 where busy = 1") + ((int) bssAborted) + (stopTime, stopTime != 0).exec(); + txn.exec("update Builds set busy = 0 where finished = 0 and busy = 1"); + txn.commit(); +} + + +int State::createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step, + const std::string & machine, BuildStepStatus status, const std::string & errorMsg, BuildID propagatedFrom) +{ + /* Acquire an exclusive lock on BuildSteps to ensure that we don't + race with other threads creating a step of the same build. */ + txn.exec("lock table BuildSteps in exclusive mode"); + + auto res = txn.parameterized("select max(stepnr) from BuildSteps where build = $1")(build->id).exec(); + int stepNr = res[0][0].is_null() ? 1 : res[0][0].as() + 1; + + txn.parameterized + ("insert into BuildSteps (build, stepnr, type, drvPath, busy, startTime, system, status, propagatedFrom, errorMsg, stopTime, machine) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)") + (build->id)(stepNr)(0)(step->drvPath)(status == bssBusy ? 1 : 0) + (startTime, startTime != 0) + (step->drv.platform) + ((int) status, status != bssBusy) + (propagatedFrom, propagatedFrom != 0) + (errorMsg, errorMsg != "") + (startTime, startTime != 0 && status != bssBusy) + (machine).exec(); + + for (auto & output : step->drv.outputs) + txn.parameterized + ("insert into BuildStepOutputs (build, stepnr, name, path) values ($1, $2, $3, $4)") + (build->id)(stepNr)(output.first)(output.second.path).exec(); + + return stepNr; +} + + +void State::finishBuildStep(pqxx::work & txn, time_t startTime, time_t stopTime, BuildID buildId, int stepNr, + const std::string & machine, BuildStepStatus status, const std::string & errorMsg, BuildID propagatedFrom) +{ + assert(startTime); + assert(stopTime); + txn.parameterized + ("update BuildSteps set busy = 0, status = $1, propagatedFrom = $4, errorMsg = $5, startTime = $6, stopTime = $7, machine = $8 where build = $2 and stepnr = $3") + ((int) status)(buildId)(stepNr) + (propagatedFrom, propagatedFrom != 0) + (errorMsg, errorMsg != "") + (startTime)(stopTime) + (machine, machine != "").exec(); +} + + +/* Get the steps and unfinished builds that depend on the given step. */ +void getDependents(Step::ptr step, std::set & builds, std::set & steps) +{ + std::function visit; + + visit = [&](Step::ptr step) { + if (has(steps, step)) return; + steps.insert(step); + + std::vector rdeps; + + { + auto step_(step->state.lock()); + + for (auto & build : step_->builds) { + auto build_ = build.lock(); + if (build_ && !build_->finishedInDB) builds.insert(build_); + } + + /* Make a copy of rdeps so that we don't hold the lock for + very long. */ + rdeps = step_->rdeps; + } + + for (auto & rdep : rdeps) { + auto rdep_ = rdep.lock(); + if (rdep_) visit(rdep_); + } + }; + + visit(step); +} + + +void visitDependencies(std::function visitor, Step::ptr start) +{ + std::set queued; + std::queue todo; + todo.push(start); + + while (!todo.empty()) { + auto step = todo.front(); + todo.pop(); + + visitor(step); + + auto state(step->state.lock()); + for (auto & dep : state->deps) + if (queued.find(dep) == queued.end()) { + queued.insert(dep); + todo.push(dep); + } + } +} + + +void State::markSucceededBuild(pqxx::work & txn, Build::ptr build, + const BuildOutput & res, bool isCachedBuild, time_t startTime, time_t stopTime) +{ + printMsg(lvlInfo, format("marking build %1% as succeeded") % build->id); + + if (build->finishedInDB) return; + + if (txn.parameterized("select 1 from Builds where id = $1 and finished = 0")(build->id).exec().empty()) return; + + txn.parameterized + ("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $4, size = $5, closureSize = $6, releaseName = $7, isCachedBuild = $8 where id = $1") + (build->id) + ((int) (res.failed ? bsFailedWithOutput : bsSuccess)) + (startTime) + (stopTime) + (res.size) + (res.closureSize) + (res.releaseName, res.releaseName != "") + (isCachedBuild ? 1 : 0).exec(); + + txn.parameterized("delete from BuildProducts where build = $1")(build->id).exec(); + + unsigned int productNr = 1; + for (auto & product : res.products) { + txn.parameterized + ("insert into BuildProducts (build, productnr, type, subtype, fileSize, sha1hash, sha256hash, path, name, defaultPath) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)") + (build->id) + (productNr++) + (product.type) + (product.subtype) + (product.fileSize, product.isRegular) + (printHash(product.sha1hash), product.isRegular) + (printHash(product.sha256hash), product.isRegular) + (product.path) + (product.name) + (product.defaultPath).exec(); + } + + txn.parameterized("delete from BuildMetrics where build = $1")(build->id).exec(); + + for (auto & metric : res.metrics) { + txn.parameterized + ("insert into BuildMetrics (build, name, unit, value, project, jobset, job, timestamp) values ($1, $2, $3, $4, $5, $6, $7, $8)") + (build->id) + (metric.second.name) + (metric.second.unit, metric.second.unit != "") + (metric.second.value) + (build->projectName) + (build->jobsetName) + (build->jobName) + (build->timestamp).exec(); + } + + nrBuildsDone++; +} + + +bool State::checkCachedFailure(Step::ptr step, Connection & conn) +{ + pqxx::work txn(conn); + for (auto & path : outputPaths(step->drv)) + if (!txn.parameterized("select 1 from FailedPaths where path = $1")(path).exec().empty()) + return true; + return false; +} + + +void State::logCompressor() +{ + while (true) { + try { + + Path logPath; + { + auto logCompressorQueue_(logCompressorQueue.lock()); + while (logCompressorQueue_->empty()) + logCompressorQueue_.wait(logCompressorWakeup); + logPath = logCompressorQueue_->front(); + logCompressorQueue_->pop(); + } + + if (!pathExists(logPath)) continue; + + printMsg(lvlChatty, format("compressing log file ‘%1%’") % logPath); + + Path tmpPath = logPath + ".bz2.tmp"; + + AutoCloseFD fd = open(tmpPath.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0644); + + // FIXME: use libbz2 + + Pid pid = startProcess([&]() { + if (dup2(fd, STDOUT_FILENO) == -1) + throw SysError("cannot dup output pipe to stdout"); + execlp("bzip2", "bzip2", "-c", logPath.c_str(), nullptr); + throw SysError("cannot start bzip2"); + }); + + int res = pid.wait(true); + + if (res != 0) + throw Error(format("bzip2 returned exit code %1% while compressing ‘%2%’") + % res % logPath); + + if (rename(tmpPath.c_str(), (logPath + ".bz2").c_str()) != 0) + throw SysError(format("renaming ‘%1%’") % tmpPath); + + if (unlink(logPath.c_str()) != 0) + throw SysError(format("unlinking ‘%1%’") % logPath); + + } catch (std::exception & e) { + printMsg(lvlError, format("log compressor: %1%") % e.what()); + sleep(5); + } + } +} + + +void State::notificationSender() +{ + while (true) { + try { + + NotificationItem item; + { + auto notificationSenderQueue_(notificationSenderQueue.lock()); + while (notificationSenderQueue_->empty()) + notificationSenderQueue_.wait(notificationSenderWakeup); + item = notificationSenderQueue_->front(); + notificationSenderQueue_->pop(); + } + + printMsg(lvlChatty, format("sending notification about build %1%") % item.first); + + Pid pid = startProcess([&]() { + Strings argv({"hydra-notify", "build", int2String(item.first)}); + for (auto id : item.second) + argv.push_back(int2String(id)); + execvp("hydra-notify", (char * *) stringsToCharPtrs(argv).data()); // FIXME: remove cast + throw SysError("cannot start hydra-notify"); + }); + + int res = pid.wait(true); + + if (res != 0) + throw Error(format("hydra-build returned exit code %1% notifying about build %2%") + % res % item.first); + + } catch (std::exception & e) { + printMsg(lvlError, format("notification sender: %1%") % e.what()); + sleep(5); + } + } +} + + +std::shared_ptr State::acquireGlobalLock() +{ + Path lockPath = hydraData + "/queue-runner/lock"; + + createDirs(dirOf(lockPath)); + + auto lock = std::make_shared(); + if (!lock->lockPaths(PathSet({lockPath}), "", false)) return 0; + + return lock; +} + + +void State::dumpStatus(Connection & conn, bool log) +{ + std::ostringstream out; + + { + JSONObject root(out); + time_t now = time(0); + root.attr("status", "up"); + root.attr("time", time(0)); + root.attr("uptime", now - startedAt); + root.attr("pid", getpid()); + { + auto builds_(builds.lock()); + root.attr("nrQueuedBuilds", builds_->size()); + } + { + auto steps_(steps.lock()); + for (auto i = steps_->begin(); i != steps_->end(); ) + if (i->second.lock()) ++i; else i = steps_->erase(i); + root.attr("nrUnfinishedSteps", steps_->size()); + } + { + auto runnable_(runnable.lock()); + for (auto i = runnable_->begin(); i != runnable_->end(); ) + if (i->lock()) ++i; else i = runnable_->erase(i); + root.attr("nrRunnableSteps", runnable_->size()); + } + root.attr("nrActiveSteps", nrActiveSteps); + root.attr("nrStepsBuilding", nrStepsBuilding); + root.attr("nrStepsCopyingTo", nrStepsCopyingTo); + root.attr("nrStepsCopyingFrom", nrStepsCopyingFrom); + root.attr("nrStepsWaiting", nrStepsWaiting); + root.attr("bytesSent"); out << bytesSent; + root.attr("bytesReceived"); out << bytesReceived; + root.attr("nrBuildsRead", nrBuildsRead); + root.attr("nrBuildsDone", nrBuildsDone); + root.attr("nrStepsDone", nrStepsDone); + root.attr("nrRetries", nrRetries); + root.attr("maxNrRetries", maxNrRetries); + if (nrStepsDone) { + root.attr("totalStepTime", totalStepTime); + root.attr("totalStepBuildTime", totalStepBuildTime); + root.attr("avgStepTime"); out << (float) totalStepTime / nrStepsDone; + root.attr("avgStepBuildTime"); out << (float) totalStepBuildTime / nrStepsDone; + } + root.attr("nrQueueWakeups", nrQueueWakeups); + root.attr("nrDispatcherWakeups", nrDispatcherWakeups); + root.attr("nrDbConnections", dbPool.count()); + { + root.attr("machines"); + JSONObject nested(out); + auto machines_(machines.lock()); + for (auto & i : *machines_) { + auto & m(i.second); + auto & s(m->state); + nested.attr(m->sshName); + JSONObject nested2(out); + nested2.attr("currentJobs", s->currentJobs); + nested2.attr("nrStepsDone", s->nrStepsDone); + if (m->state->nrStepsDone) { + nested2.attr("totalStepTime", s->totalStepTime); + nested2.attr("totalStepBuildTime", s->totalStepBuildTime); + nested2.attr("avgStepTime"); out << (float) s->totalStepTime / s->nrStepsDone; + nested2.attr("avgStepBuildTime"); out << (float) s->totalStepBuildTime / s->nrStepsDone; + } + } + } + { + root.attr("jobsets"); + JSONObject nested(out); + auto jobsets_(jobsets.lock()); + for (auto & jobset : *jobsets_) { + nested.attr(jobset.first.first + ":" + jobset.first.second); + JSONObject nested2(out); + nested2.attr("shareUsed"); out << jobset.second->shareUsed(); + nested2.attr("seconds", jobset.second->getSeconds()); + } + } + } + + if (log) printMsg(lvlInfo, format("status: %1%") % out.str()); + + { + pqxx::work txn(conn); + // FIXME: use PostgreSQL 9.5 upsert. + txn.exec("delete from SystemStatus where what = 'queue-runner'"); + txn.parameterized("insert into SystemStatus values ('queue-runner', $1)")(out.str()).exec(); + txn.exec("notify status_dumped"); + txn.commit(); + } +} + + +void State::showStatus() +{ + auto conn(dbPool.get()); + receiver statusDumped(*conn, "status_dumped"); + + string status; + bool barf = false; + + /* Get the last JSON status dump from the database. */ + { + pqxx::work txn(*conn); + auto res = txn.exec("select status from SystemStatus where what = 'queue-runner'"); + if (res.size()) status = res[0][0].as(); + } + + if (status != "") { + + /* If the status is not empty, then the queue runner is + running. Ask it to update the status dump. */ + { + pqxx::work txn(*conn); + txn.exec("notify dump_status"); + txn.commit(); + } + + /* Wait until it has done so. */ + barf = conn->await_notification(5, 0) == 0; + + /* Get the new status. */ + { + pqxx::work txn(*conn); + auto res = txn.exec("select status from SystemStatus where what = 'queue-runner'"); + if (res.size()) status = res[0][0].as(); + } + + } + + if (status == "") status = R"({"status":"down"})"; + + std::cout << status << "\n"; + + if (barf) + throw Error("queue runner did not respond; status information may be wrong"); +} + + +void State::unlock() +{ + auto lock = acquireGlobalLock(); + if (!lock) + throw Error("hydra-queue-runner is currently running"); + + auto conn(dbPool.get()); + + clearBusy(*conn, 0); + + { + pqxx::work txn(*conn); + txn.exec("delete from SystemStatus where what = 'queue-runner'"); + txn.commit(); + } +} + + +void State::run(BuildID buildOne) +{ + startedAt = time(0); + this->buildOne = buildOne; + + auto lock = acquireGlobalLock(); + if (!lock) + throw Error("hydra-queue-runner is already running"); + + { + auto conn(dbPool.get()); + clearBusy(*conn, 0); + dumpStatus(*conn, false); + } + + loadMachinesFile(); + + std::thread(&State::monitorMachinesFile, this).detach(); + + std::thread(&State::queueMonitor, this).detach(); + + std::thread(&State::dispatcher, this).detach(); + + /* Run a log compressor thread. If needed, we could start more + than one. */ + std::thread(&State::logCompressor, this).detach(); + + /* Idem for notification sending. */ + std::thread(&State::notificationSender, this).detach(); + + /* Monitor the database for status dump requests (e.g. from + ‘hydra-queue-runner --status’). */ + while (true) { + try { + auto conn(dbPool.get()); + receiver dumpStatus(*conn, "dump_status"); + while (true) { + bool timeout = conn->await_notification(300, 0) == 0; + State::dumpStatus(*conn, timeout); + } + } catch (std::exception & e) { + printMsg(lvlError, format("main thread: %1%") % e.what()); + sleep(10); // probably a DB problem, so don't retry right away + } + } +} + + +int main(int argc, char * * argv) +{ + return handleExceptions(argv[0], [&]() { + initNix(); + + signal(SIGINT, SIG_DFL); + signal(SIGTERM, SIG_DFL); + signal(SIGHUP, SIG_DFL); + + bool unlock = false; + bool status = false; + BuildID buildOne = 0; + + parseCmdLine(argc, argv, [&](Strings::iterator & arg, const Strings::iterator & end) { + if (*arg == "--unlock") + unlock = true; + else if (*arg == "--status") + status = true; + else if (*arg == "--build-one") { + if (!string2Int(getArg(*arg, arg, end), buildOne)) + throw Error("‘--build-one’ requires a build ID"); + } else + return false; + return true; + }); + + settings.buildVerbosity = lvlVomit; + settings.useSubstitutes = false; + settings.lockCPU = false; + + State state; + if (status) + state.showStatus(); + else if (unlock) + state.unlock(); + else + state.run(buildOne); + }); +} diff --git a/src/hydra-queue-runner/pool.hh b/src/hydra-queue-runner/pool.hh new file mode 100644 index 00000000..a1cd3977 --- /dev/null +++ b/src/hydra-queue-runner/pool.hh @@ -0,0 +1,85 @@ +#pragma once + +#include +#include + +#include "sync.hh" + +/* This template class implements a simple pool manager of resources + of some type R, such as database connections. It is used as + follows: + + class Connection { ... }; + + Pool pool; + + { + auto conn(pool.get()); + conn->exec("select ..."); + } + + Here, the Connection object referenced by ‘conn’ is automatically + returned to the pool when ‘conn’ goes out of scope. +*/ + +template +class Pool +{ +private: + struct State + { + unsigned int count = 0; + std::list> idle; + }; + + Sync state; + +public: + + class Handle + { + private: + Pool & pool; + std::shared_ptr r; + + friend Pool; + + Handle(Pool & pool, std::shared_ptr r) : pool(pool), r(r) { } + + public: + Handle(Handle && h) : pool(h.pool), r(h.r) { h.r.reset(); } + + Handle(const Handle & l) = delete; + + ~Handle() + { + auto state_(pool.state.lock()); + if (r) state_->idle.push_back(r); + } + + R * operator -> () { return r.get(); } + R & operator * () { return *r; } + }; + + Handle get() + { + { + auto state_(state.lock()); + if (!state_->idle.empty()) { + auto p = state_->idle.back(); + state_->idle.pop_back(); + return Handle(*this, p); + } + state_->count++; + } + /* Note: we don't hold the lock while creating a new instance, + because creation might take a long time. */ + return Handle(*this, std::make_shared()); + } + + unsigned int count() + { + auto state_(state.lock()); + return state_->count; + } +}; diff --git a/src/hydra-queue-runner/queue-monitor.cc b/src/hydra-queue-runner/queue-monitor.cc new file mode 100644 index 00000000..3be08897 --- /dev/null +++ b/src/hydra-queue-runner/queue-monitor.cc @@ -0,0 +1,466 @@ +#include "state.hh" +#include "build-result.hh" + +using namespace nix; + + +void State::queueMonitor() +{ + while (true) { + try { + queueMonitorLoop(); + } catch (std::exception & e) { + printMsg(lvlError, format("queue monitor: %1%") % e.what()); + sleep(10); // probably a DB problem, so don't retry right away + } + } +} + + +void State::queueMonitorLoop() +{ + auto conn(dbPool.get()); + + receiver buildsAdded(*conn, "builds_added"); + receiver buildsRestarted(*conn, "builds_restarted"); + receiver buildsCancelled(*conn, "builds_cancelled"); + receiver buildsDeleted(*conn, "builds_deleted"); + receiver buildsBumped(*conn, "builds_bumped"); + receiver jobsetSharesChanged(*conn, "jobset_shares_changed"); + + auto store = openStore(); // FIXME: pool + + unsigned int lastBuildId = 0; + + while (true) { + getQueuedBuilds(*conn, store, lastBuildId); + + /* Sleep until we get notification from the database about an + event. */ + conn->await_notification(); + nrQueueWakeups++; + + if (buildsAdded.get()) + printMsg(lvlTalkative, "got notification: new builds added to the queue"); + if (buildsRestarted.get()) { + printMsg(lvlTalkative, "got notification: builds restarted"); + lastBuildId = 0; // check all builds + } + if (buildsCancelled.get() || buildsDeleted.get() || buildsBumped.get()) { + printMsg(lvlTalkative, "got notification: builds cancelled or bumped"); + processQueueChange(*conn); + } + if (jobsetSharesChanged.get()) { + printMsg(lvlTalkative, "got notification: jobset shares changed"); + processJobsetSharesChange(*conn); + } + } +} + + +void State::getQueuedBuilds(Connection & conn, std::shared_ptr store, unsigned int & lastBuildId) +{ + printMsg(lvlInfo, format("checking the queue for builds > %1%...") % lastBuildId); + + /* Grab the queued builds from the database, but don't process + them yet (since we don't want a long-running transaction). */ + std::vector newIDs; + std::map newBuildsByID; + std::multimap newBuildsByPath; + + { + pqxx::work txn(conn); + + auto res = txn.parameterized + ("select id, project, jobset, job, drvPath, maxsilent, timeout, timestamp, globalPriority, priority from Builds " + "where id > $1 and finished = 0 order by globalPriority desc, id") + (lastBuildId).exec(); + + for (auto const & row : res) { + auto builds_(builds.lock()); + BuildID id = row["id"].as(); + if (buildOne && id != buildOne) continue; + if (id > lastBuildId) lastBuildId = id; + if (has(*builds_, id)) continue; + + auto build = std::make_shared(); + build->id = id; + build->drvPath = row["drvPath"].as(); + build->projectName = row["project"].as(); + build->jobsetName = row["jobset"].as(); + build->jobName = row["job"].as(); + build->maxSilentTime = row["maxsilent"].as(); + build->buildTimeout = row["timeout"].as(); + build->timestamp = row["timestamp"].as(); + build->globalPriority = row["globalPriority"].as(); + build->localPriority = row["priority"].as(); + build->jobset = createJobset(txn, build->projectName, build->jobsetName); + + newIDs.push_back(id); + newBuildsByID[id] = build; + newBuildsByPath.emplace(std::make_pair(build->drvPath, id)); + } + } + + std::set newRunnable; + unsigned int nrAdded; + std::function createBuild; + + createBuild = [&](Build::ptr build) { + printMsg(lvlTalkative, format("loading build %1% (%2%)") % build->id % build->fullJobName()); + nrAdded++; + newBuildsByID.erase(build->id); + + if (!store->isValidPath(build->drvPath)) { + /* Derivation has been GC'ed prematurely. */ + printMsg(lvlError, format("aborting GC'ed build %1%") % build->id); + if (!build->finishedInDB) { + pqxx::work txn(conn); + txn.parameterized + ("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $3, errorMsg = $4 where id = $1 and finished = 0") + (build->id) + ((int) bsAborted) + (time(0)) + ("derivation was garbage-collected prior to build").exec(); + txn.commit(); + build->finishedInDB = true; + nrBuildsDone++; + } + return; + } + + std::set newSteps; + std::set finishedDrvs; // FIXME: re-use? + Step::ptr step = createStep(store, build->drvPath, build, 0, finishedDrvs, newSteps, newRunnable); + + /* Some of the new steps may be the top level of builds that + we haven't processed yet. So do them now. This ensures that + if build A depends on build B with top-level step X, then X + will be "accounted" to B in doBuildStep(). */ + for (auto & r : newSteps) { + auto i = newBuildsByPath.find(r->drvPath); + if (i == newBuildsByPath.end()) continue; + auto j = newBuildsByID.find(i->second); + if (j == newBuildsByID.end()) continue; + createBuild(j->second); + } + + /* If we didn't get a step, it means the step's outputs are + all valid. So we mark this as a finished, cached build. */ + if (!step) { + Derivation drv = readDerivation(build->drvPath); + BuildOutput res = getBuildOutput(store, drv); + + pqxx::work txn(conn); + time_t now = time(0); + markSucceededBuild(txn, build, res, true, now, now); + txn.commit(); + + build->finishedInDB = true; + + return; + } + + /* If any step has an unsupported system type or has a + previously failed output path, then fail the build right + away. */ + bool badStep = false; + for (auto & r : newSteps) { + BuildStatus buildStatus = bsSuccess; + BuildStepStatus buildStepStatus = bssFailed; + + if (checkCachedFailure(r, conn)) { + printMsg(lvlError, format("marking build %1% as cached failure") % build->id); + buildStatus = step == r ? bsFailed : bsDepFailed; + buildStepStatus = bssFailed; + } + + if (buildStatus == bsSuccess) { + bool supported = false; + { + auto machines_(machines.lock()); // FIXME: use shared_mutex + for (auto & m : *machines_) + if (m.second->supportsStep(r)) { supported = true; break; } + } + + if (!supported) { + printMsg(lvlError, format("aborting unsupported build %1%") % build->id); + buildStatus = bsUnsupported; + buildStepStatus = bssUnsupported; + } + } + + if (buildStatus != bsSuccess) { + time_t now = time(0); + if (!build->finishedInDB) { + pqxx::work txn(conn); + createBuildStep(txn, 0, build, r, "", buildStepStatus); + txn.parameterized + ("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $3, isCachedBuild = $4 where id = $1 and finished = 0") + (build->id) + ((int) buildStatus) + (now) + (buildStatus != bsUnsupported ? 1 : 0).exec(); + txn.commit(); + build->finishedInDB = true; + nrBuildsDone++; + } + badStep = true; + break; + } + } + + if (badStep) return; + + /* Note: if we exit this scope prior to this, the build and + all newly created steps are destroyed. */ + + { + auto builds_(builds.lock()); + if (!build->finishedInDB) // FIXME: can this happen? + (*builds_)[build->id] = build; + build->toplevel = step; + } + + build->propagatePriorities(); + + printMsg(lvlChatty, format("added build %1% (top-level step %2%, %3% new steps)") + % build->id % step->drvPath % newSteps.size()); + }; + + /* Now instantiate build steps for each new build. The builder + threads can start building the runnable build steps right away, + even while we're still processing other new builds. */ + for (auto id : newIDs) { + auto i = newBuildsByID.find(id); + if (i == newBuildsByID.end()) continue; + auto build = i->second; + + newRunnable.clear(); + nrAdded = 0; + try { + createBuild(build); + } catch (Error & e) { + e.addPrefix(format("while loading build %1%: ") % build->id); + throw; + } + + /* Add the new runnable build steps to ‘runnable’ and wake up + the builder threads. */ + printMsg(lvlChatty, format("got %1% new runnable steps from %2% new builds") % newRunnable.size() % nrAdded); + for (auto & r : newRunnable) + makeRunnable(r); + + nrBuildsRead += nrAdded; + } +} + + +void Build::propagatePriorities() +{ + /* Update the highest global priority and lowest build ID fields + of each dependency. This is used by the dispatcher to start + steps in order of descending global priority and ascending + build ID. */ + visitDependencies([&](const Step::ptr & step) { + auto step_(step->state.lock()); + step_->highestGlobalPriority = std::max(step_->highestGlobalPriority, globalPriority); + step_->highestLocalPriority = std::max(step_->highestLocalPriority, localPriority); + step_->lowestBuildID = std::min(step_->lowestBuildID, id); + step_->jobsets.insert(jobset); + }, toplevel); +} + + +void State::processQueueChange(Connection & conn) +{ + /* Get the current set of queued builds. */ + std::map currentIds; + { + pqxx::work txn(conn); + auto res = txn.exec("select id, globalPriority from Builds where finished = 0"); + for (auto const & row : res) + currentIds[row["id"].as()] = row["globalPriority"].as(); + } + + auto builds_(builds.lock()); + + for (auto i = builds_->begin(); i != builds_->end(); ) { + auto b = currentIds.find(i->first); + if (b == currentIds.end()) { + printMsg(lvlInfo, format("discarding cancelled build %1%") % i->first); + i = builds_->erase(i); + // FIXME: ideally we would interrupt active build steps here. + continue; + } + if (i->second->globalPriority < b->second) { + printMsg(lvlInfo, format("priority of build %1% increased") % i->first); + i->second->globalPriority = b->second; + i->second->propagatePriorities(); + } + ++i; + } +} + + +Step::ptr State::createStep(std::shared_ptr store, const Path & drvPath, + Build::ptr referringBuild, Step::ptr referringStep, std::set & finishedDrvs, + std::set & newSteps, std::set & newRunnable) +{ + if (finishedDrvs.find(drvPath) != finishedDrvs.end()) return 0; + + /* Check if the requested step already exists. If not, create a + new step. In any case, make the step reachable from + referringBuild or referringStep. This is done atomically (with + ‘steps’ locked), to ensure that this step can never become + reachable from a new build after doBuildStep has removed it + from ‘steps’. */ + Step::ptr step; + bool isNew = false; + { + auto steps_(steps.lock()); + + /* See if the step already exists in ‘steps’ and is not + stale. */ + auto prev = steps_->find(drvPath); + if (prev != steps_->end()) { + step = prev->second.lock(); + /* Since ‘step’ is a strong pointer, the referred Step + object won't be deleted after this. */ + if (!step) steps_->erase(drvPath); // remove stale entry + } + + /* If it doesn't exist, create it. */ + if (!step) { + step = std::make_shared(); + step->drvPath = drvPath; + isNew = true; + } + + auto step_(step->state.lock()); + + assert(step_->created != isNew); + + if (referringBuild) + step_->builds.push_back(referringBuild); + + if (referringStep) + step_->rdeps.push_back(referringStep); + + (*steps_)[drvPath] = step; + } + + if (!isNew) return step; + + printMsg(lvlDebug, format("considering derivation ‘%1%’") % drvPath); + + /* Initialize the step. Note that the step may be visible in + ‘steps’ before this point, but that doesn't matter because + it's not runnable yet, and other threads won't make it + runnable while step->created == false. */ + step->drv = readDerivation(drvPath); + { + auto i = step->drv.env.find("requiredSystemFeatures"); + if (i != step->drv.env.end()) + step->requiredSystemFeatures = tokenizeString>(i->second); + } + + auto attr = step->drv.env.find("preferLocalBuild"); + step->preferLocalBuild = + attr != step->drv.env.end() && attr->second == "1" + && has(localPlatforms, step->drv.platform); + + /* Are all outputs valid? */ + bool valid = true; + for (auto & i : step->drv.outputs) { + if (!store->isValidPath(i.second.path)) { + valid = false; + break; + } + } + + // FIXME: check whether all outputs are in the binary cache. + if (valid) { + finishedDrvs.insert(drvPath); + return 0; + } + + /* No, we need to build. */ + printMsg(lvlDebug, format("creating build step ‘%1%’") % drvPath); + newSteps.insert(step); + + /* Create steps for the dependencies. */ + for (auto & i : step->drv.inputDrvs) { + auto dep = createStep(store, i.first, 0, step, finishedDrvs, newSteps, newRunnable); + if (dep) { + auto step_(step->state.lock()); + step_->deps.insert(dep); + } + } + + /* If the step has no (remaining) dependencies, make it + runnable. */ + { + auto step_(step->state.lock()); + assert(!step_->created); + step_->created = true; + if (step_->deps.empty()) + newRunnable.insert(step); + } + + return step; +} + + +Jobset::ptr State::createJobset(pqxx::work & txn, + const std::string & projectName, const std::string & jobsetName) +{ + auto p = std::make_pair(projectName, jobsetName); + + { + auto jobsets_(jobsets.lock()); + auto i = jobsets_->find(p); + if (i != jobsets_->end()) return i->second; + } + + auto res = txn.parameterized + ("select schedulingShares from Jobsets where project = $1 and name = $2") + (projectName)(jobsetName).exec(); + if (res.empty()) throw Error("missing jobset - can't happen"); + + auto shares = res[0]["schedulingShares"].as(); + + auto jobset = std::make_shared(); + jobset->setShares(shares); + + /* Load the build steps from the last 24 hours. */ + res = txn.parameterized + ("select s.startTime, s.stopTime from BuildSteps s join Builds b on build = id " + "where s.startTime is not null and s.stopTime > $1 and project = $2 and jobset = $3") + (time(0) - Jobset::schedulingWindow * 10)(projectName)(jobsetName).exec(); + for (auto const & row : res) { + time_t startTime = row["startTime"].as(); + time_t stopTime = row["stopTime"].as(); + jobset->addStep(startTime, stopTime - startTime); + } + + auto jobsets_(jobsets.lock()); + // Can't happen because only this thread adds to "jobsets". + assert(jobsets_->find(p) == jobsets_->end()); + (*jobsets_)[p] = jobset; + return jobset; +} + + +void State::processJobsetSharesChange(Connection & conn) +{ + /* Get the current set of jobsets. */ + pqxx::work txn(conn); + auto res = txn.exec("select project, name, schedulingShares from Jobsets"); + for (auto const & row : res) { + auto jobsets_(jobsets.lock()); + auto i = jobsets_->find(std::make_pair(row["project"].as(), row["name"].as())); + if (i == jobsets_->end()) continue; + i->second->setShares(row["schedulingShares"].as()); + } +} diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh new file mode 100644 index 00000000..060e2d1c --- /dev/null +++ b/src/hydra-queue-runner/state.hh @@ -0,0 +1,416 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "db.hh" +#include "counter.hh" +#include "pathlocks.hh" +#include "pool.hh" +#include "sync.hh" + +#include "store-api.hh" +#include "derivations.hh" + + +typedef unsigned int BuildID; + +typedef std::chrono::time_point system_time; + + +typedef enum { + bsSuccess = 0, + bsFailed = 1, + bsDepFailed = 2, + bsAborted = 3, + bsFailedWithOutput = 6, + bsTimedOut = 7, + bsUnsupported = 9, +} BuildStatus; + + +typedef enum { + bssSuccess = 0, + bssFailed = 1, + bssAborted = 4, + bssTimedOut = 7, + bssCachedFailure = 8, + bssUnsupported = 9, + bssBusy = 100, // not stored +} BuildStepStatus; + + +struct RemoteResult : nix::BuildResult +{ + time_t startTime = 0, stopTime = 0; + nix::Path logFile; + + bool canRetry() + { + return status == TransientFailure || status == MiscFailure; + } +}; + + +struct Step; +struct BuildOutput; + + +class Jobset +{ +public: + + typedef std::shared_ptr ptr; + typedef std::weak_ptr wptr; + + static const time_t schedulingWindow = 24 * 60 * 60; + +private: + + std::atomic seconds{0}; + std::atomic shares{1}; + + /* The start time and duration of the most recent build steps. */ + Sync> steps; + +public: + + double shareUsed() + { + return (double) seconds / shares; + } + + void setShares(int shares_) + { + assert(shares_ > 0); + shares = shares_; + } + + time_t getSeconds() { return seconds; } + + void addStep(time_t startTime, time_t duration); + + void pruneSteps(); +}; + + +struct Build +{ + typedef std::shared_ptr ptr; + typedef std::weak_ptr wptr; + + BuildID id; + nix::Path drvPath; + std::map outputs; + std::string projectName, jobsetName, jobName; + time_t timestamp; + unsigned int maxSilentTime, buildTimeout; + int localPriority, globalPriority; + + std::shared_ptr toplevel; + + Jobset::ptr jobset; + + std::atomic_bool finishedInDB{false}; + + std::string fullJobName() + { + return projectName + ":" + jobsetName + ":" + jobName; + } + + void propagatePriorities(); +}; + + +struct Step +{ + typedef std::shared_ptr ptr; + typedef std::weak_ptr wptr; + + nix::Path drvPath; + nix::Derivation drv; + std::set requiredSystemFeatures; + bool preferLocalBuild; + + struct State + { + /* Whether the step has finished initialisation. */ + bool created = false; + + /* The build steps on which this step depends. */ + std::set deps; + + /* The build steps that depend on this step. */ + std::vector rdeps; + + /* Builds that have this step as the top-level derivation. */ + std::vector builds; + + /* Jobsets to which this step belongs. Used for determining + scheduling priority. */ + std::set jobsets; + + /* Number of times we've tried this step. */ + unsigned int tries = 0; + + /* Point in time after which the step can be retried. */ + system_time after; + + /* The highest global priority of any build depending on this + step. */ + int highestGlobalPriority{0}; + + /* The lowest share used of any jobset depending on this + step. */ + double lowestShareUsed; + + /* The highest local priority of any build depending on this + step. */ + int highestLocalPriority{0}; + + /* The lowest ID of any build depending on this step. */ + BuildID lowestBuildID{std::numeric_limits::max()}; + }; + + std::atomic_bool finished{false}; // debugging + + Sync state; + + ~Step() + { + //printMsg(lvlError, format("destroying step %1%") % drvPath); + } +}; + + +void getDependents(Step::ptr step, std::set & builds, std::set & steps); + +/* Call ‘visitor’ for a step and all its dependencies. */ +void visitDependencies(std::function visitor, Step::ptr step); + + +struct Machine +{ + typedef std::shared_ptr ptr; + + std::string sshName, sshKey; + std::set systemTypes, supportedFeatures, mandatoryFeatures; + unsigned int maxJobs = 1; + float speedFactor = 1.0; + + struct State { + typedef std::shared_ptr ptr; + counter currentJobs{0}; + counter nrStepsDone{0}; + counter totalStepTime{0}; // total time for steps, including closure copying + counter totalStepBuildTime{0}; // total build time for steps + + struct ConnectInfo + { + system_time lastFailure, disabledUntil; + unsigned int consecutiveFailures; + }; + Sync connectInfo; + + /* Mutex to prevent multiple threads from sending data to the + same machine (which would be inefficient). */ + std::mutex sendLock; + }; + + State::ptr state; + + bool supportsStep(Step::ptr step) + { + if (systemTypes.find(step->drv.platform) == systemTypes.end()) return false; + for (auto & f : mandatoryFeatures) + if (step->requiredSystemFeatures.find(f) == step->requiredSystemFeatures.end() + && !(step->preferLocalBuild && f == "local")) + return false; + for (auto & f : step->requiredSystemFeatures) + if (supportedFeatures.find(f) == supportedFeatures.end()) return false; + return true; + } +}; + + +class State +{ +private: + + // FIXME: Make configurable. + const unsigned int maxTries = 5; + const unsigned int retryInterval = 60; // seconds + const float retryBackoff = 3.0; + const unsigned int maxParallelCopyClosure = 4; + + nix::Path hydraData, logDir; + + nix::StringSet localPlatforms; + + /* The queued builds. */ + typedef std::map Builds; + Sync builds; + + /* The jobsets. */ + typedef std::map, Jobset::ptr> Jobsets; + Sync jobsets; + + /* All active or pending build steps (i.e. dependencies of the + queued builds). Note that these are weak pointers. Steps are + kept alive by being reachable from Builds or by being in + progress. */ + typedef std::map Steps; + Sync steps; + + /* Build steps that have no unbuilt dependencies. */ + typedef std::list Runnable; + Sync runnable; + + /* CV for waking up the dispatcher. */ + Sync dispatcherWakeup; + std::condition_variable_any dispatcherWakeupCV; + + /* PostgreSQL connection pool. */ + Pool dbPool; + + /* The build machines. */ + typedef std::map Machines; + Sync machines; // FIXME: use atomic_shared_ptr + + nix::Path machinesFile; + struct stat machinesFileStat; + + /* Various stats. */ + time_t startedAt; + counter nrBuildsRead{0}; + counter nrBuildsDone{0}; + counter nrStepsDone{0}; + counter nrActiveSteps{0}; + counter nrStepsBuilding{0}; + counter nrStepsCopyingTo{0}; + counter nrStepsCopyingFrom{0}; + counter nrStepsWaiting{0}; + counter nrRetries{0}; + counter maxNrRetries{0}; + counter totalStepTime{0}; // total time for steps, including closure copying + counter totalStepBuildTime{0}; // total build time for steps + counter nrQueueWakeups{0}; + counter nrDispatcherWakeups{0}; + counter bytesSent{0}; + counter bytesReceived{0}; + + /* Log compressor work queue. */ + Sync> logCompressorQueue; + std::condition_variable_any logCompressorWakeup; + + /* Notification sender work queue. FIXME: if hydra-queue-runner is + killed before it has finished sending notifications about a + build, then the notifications may be lost. It would be better + to mark builds with pending notification in the database. */ + typedef std::pair> NotificationItem; + Sync> notificationSenderQueue; + std::condition_variable_any notificationSenderWakeup; + + /* Specific build to do for --build-one (testing only). */ + BuildID buildOne; + +public: + State(); + +private: + + void clearBusy(Connection & conn, time_t stopTime); + + /* (Re)load /etc/nix/machines. */ + void loadMachinesFile(); + + /* Thread to reload /etc/nix/machines periodically. */ + void monitorMachinesFile(); + + int createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step, + const std::string & machine, BuildStepStatus status, const std::string & errorMsg = "", + BuildID propagatedFrom = 0); + + void finishBuildStep(pqxx::work & txn, time_t startTime, time_t stopTime, BuildID buildId, int stepNr, + const std::string & machine, BuildStepStatus status, const std::string & errorMsg = "", + BuildID propagatedFrom = 0); + + void updateBuild(pqxx::work & txn, Build::ptr build, BuildStatus status); + + void queueMonitor(); + + void queueMonitorLoop(); + + /* Check the queue for new builds. */ + void getQueuedBuilds(Connection & conn, std::shared_ptr store, unsigned int & lastBuildId); + + /* Handle cancellation, deletion and priority bumps. */ + void processQueueChange(Connection & conn); + + Step::ptr createStep(std::shared_ptr store, const nix::Path & drvPath, + Build::ptr referringBuild, Step::ptr referringStep, std::set & finishedDrvs, + std::set & newSteps, std::set & newRunnable); + + Jobset::ptr createJobset(pqxx::work & txn, + const std::string & projectName, const std::string & jobsetName); + + void processJobsetSharesChange(Connection & conn); + + void makeRunnable(Step::ptr step); + + /* The thread that selects and starts runnable builds. */ + void dispatcher(); + + system_time doDispatch(); + + void wakeDispatcher(); + + void builder(Step::ptr step, Machine::ptr machine, std::shared_ptr reservation); + + /* Perform the given build step. Return true if the step is to be + retried. */ + bool doBuildStep(std::shared_ptr store, Step::ptr step, + Machine::ptr machine); + + void buildRemote(std::shared_ptr store, + Machine::ptr machine, Step::ptr step, + unsigned int maxSilentTime, unsigned int buildTimeout, + RemoteResult & result); + + void markSucceededBuild(pqxx::work & txn, Build::ptr build, + const BuildOutput & res, bool isCachedBuild, time_t startTime, time_t stopTime); + + bool checkCachedFailure(Step::ptr step, Connection & conn); + + /* Thread that asynchronously bzips logs of finished steps. */ + void logCompressor(); + + /* Thread that asynchronously invokes hydra-notify to send build + notifications. */ + void notificationSender(); + + /* Acquire the global queue runner lock, or null if somebody else + has it. */ + std::shared_ptr acquireGlobalLock(); + + void dumpStatus(Connection & conn, bool log); + +public: + + void showStatus(); + + void unlock(); + + void run(BuildID buildOne = 0); +}; + + +template +bool has(const C & c, const V & v) +{ + return c.find(v) != c.end(); +} diff --git a/src/hydra-queue-runner/sync.hh b/src/hydra-queue-runner/sync.hh new file mode 100644 index 00000000..1573f091 --- /dev/null +++ b/src/hydra-queue-runner/sync.hh @@ -0,0 +1,74 @@ +#pragma once + +#include +#include +#include + +/* This template class ensures synchronized access to a value of type + T. It is used as follows: + + struct Data { int x; ... }; + + Sync data; + + { + auto data_(data.lock()); + data_->x = 123; + } + + Here, "data" is automatically unlocked when "data_" goes out of + scope. +*/ + +template +class Sync +{ +private: + std::mutex mutex; + T data; + +public: + + Sync() { } + Sync(const T & data) : data(data) { } + + class Lock + { + private: + Sync * s; + friend Sync; + Lock(Sync * s) : s(s) { s->mutex.lock(); } + public: + Lock(Lock && l) : s(l.s) { l.s = 0; } + Lock(const Lock & l) = delete; + ~Lock() { if (s) s->mutex.unlock(); } + T * operator -> () { return &s->data; } + T & operator * () { return s->data; } + + /* FIXME: performance impact of condition_variable_any? */ + void wait(std::condition_variable_any & cv) + { + assert(s); + cv.wait(s->mutex); + } + + template + bool wait_for(std::condition_variable_any & cv, + const std::chrono::duration & duration, + Predicate pred) + { + assert(s); + return cv.wait_for(s->mutex, duration, pred); + } + + template + std::cv_status wait_until(std::condition_variable_any & cv, + const std::chrono::time_point & duration) + { + assert(s); + return cv.wait_until(s->mutex, duration); + } + }; + + Lock lock() { return Lock(this); } +}; diff --git a/src/hydra-queue-runner/token-server.hh b/src/hydra-queue-runner/token-server.hh new file mode 100644 index 00000000..2ff748e3 --- /dev/null +++ b/src/hydra-queue-runner/token-server.hh @@ -0,0 +1,67 @@ +#pragma once + +#include + +#include "sync.hh" + +/* This class hands out tokens. There are only ‘maxTokens’ tokens + available. Calling get() will return a Token object, representing + ownership of a token. If no token is available, get() will sleep + until another thread returns a token. */ + +class TokenServer +{ + unsigned int maxTokens; + + Sync curTokens{0}; + std::condition_variable_any wakeup; + +public: + TokenServer(unsigned int maxTokens) : maxTokens(maxTokens) { } + + class Token + { + friend TokenServer; + + TokenServer * ts; + + bool acquired = false; + + Token(TokenServer * ts, unsigned int timeout) : ts(ts) + { + auto curTokens(ts->curTokens.lock()); + while (*curTokens >= ts->maxTokens) + if (timeout) { + if (!curTokens.wait_for(ts->wakeup, std::chrono::seconds(timeout), + [&]() { return *curTokens < ts->maxTokens; })) + return; + } else + curTokens.wait(ts->wakeup); + (*curTokens)++; + acquired = true; + } + + public: + + Token(Token && t) : ts(t.ts) { t.ts = 0; } + Token(const Token & l) = delete; + + ~Token() + { + if (!ts || !acquired) return; + { + auto curTokens(ts->curTokens.lock()); + assert(*curTokens); + (*curTokens)--; + } + ts->wakeup.notify_one(); + } + + bool operator ()() { return acquired; } + }; + + Token get(unsigned int timeout = 0) + { + return Token(this, timeout); + } +}; diff --git a/src/lib/Hydra.pm b/src/lib/Hydra.pm index 70999ebe..f4583eed 100644 --- a/src/lib/Hydra.pm +++ b/src/lib/Hydra.pm @@ -55,7 +55,7 @@ __PACKAGE__->config( }, 'Plugin::Session' => { expires => 3600 * 24 * 7, - storage => ($ENV{'HYDRA_SERVER_DATA'} // Hydra::Model::DB::getHydraPath) . "/session_data", + storage => Hydra::Model::DB::getHydraPath . "/www/session_data", unlink_on_exit => 0 }, 'Plugin::AccessLog' => { diff --git a/src/lib/Hydra/Controller/Admin.pm b/src/lib/Hydra/Controller/Admin.pm index 95666f07..a09a5edb 100644 --- a/src/lib/Hydra/Controller/Admin.pm +++ b/src/lib/Hydra/Controller/Admin.pm @@ -45,7 +45,7 @@ sub clear_queue_non_current : Chained('admin') PathPart('clear-queue-non-current sub clearfailedcache : Chained('admin') PathPart('clear-failed-cache') Args(0) { my ($self, $c) = @_; - my $r = `nix-store --clear-failed-paths '*'`; + $c->model('DB::FailedPaths')->delete; $c->res->redirect($c->request->referer // "/"); } diff --git a/src/lib/Hydra/Controller/Build.pm b/src/lib/Hydra/Controller/Build.pm index 4080817a..0ab0832e 100644 --- a/src/lib/Hydra/Controller/Build.pm +++ b/src/lib/Hydra/Controller/Build.pm @@ -67,10 +67,6 @@ sub build_GET { $c->stash->{available} = all { isValidPath($_->path) } $build->buildoutputs->all; $c->stash->{drvAvailable} = isValidPath $build->drvpath; - if (!$build->finished && $build->busy) { - $c->stash->{logtext} = decode("utf-8", read_file($build->logfile, err_mode => 'quiet') // ""); - } - if ($build->finished && $build->iscachedbuild) { my $path = ($build->buildoutputs)[0]->path or die; my $cachedBuildStep = findBuildStepByOutPath($self, $c, $path); @@ -123,26 +119,32 @@ sub view_nixlog : Chained('buildChain') PathPart('nixlog') { $c->stash->{step} = $step; - showLog($c, $mode, $step->drvpath, map { $_->path } $step->buildstepoutputs->all); + showLog($c, $mode, $step->busy == 0, $step->drvpath, + map { $_->path } $step->buildstepoutputs->all); } sub view_log : Chained('buildChain') PathPart('log') { my ($self, $c, $mode) = @_; - showLog($c, $mode, $c->stash->{build}->drvpath, map { $_->path } $c->stash->{build}->buildoutputs->all); + showLog($c, $mode, $c->stash->{build}->finished, + $c->stash->{build}->drvpath, + map { $_->path } $c->stash->{build}->buildoutputs->all); } sub showLog { - my ($c, $mode, $drvPath, @outPaths) = @_; + my ($c, $mode, $finished, $drvPath, @outPaths) = @_; my $logPath = findLog($c, $drvPath, @outPaths); notFound($c, "The build log of derivation ‘$drvPath’ is not available.") unless defined $logPath; - my $size = stat($logPath)->size; - error($c, "This build log is too big to display ($size bytes).") - if $size >= 64 * 1024 * 1024; + # Don't send logs that we can't stream. + my $size = stat($logPath)->size; # FIXME: not so meaningful for compressed logs + error($c, "This build log is too big to display ($size bytes).") unless + $mode eq "raw" + || (($mode eq "tail" || $mode eq "tail-reload") && $logPath !~ /\.bz2$/) + || $size < 64 * 1024 * 1024; if (!$mode) { # !!! quick hack @@ -154,12 +156,10 @@ sub showLog { } elsif ($mode eq "raw") { - if ($logPath !~ /.bz2$/) { - $c->serve_static_file($logPath); - } else { - $c->stash->{'plain'} = { data => (scalar logContents($logPath)) || " " }; - $c->forward('Hydra::View::Plain'); - } + $c->stash->{logPath} = $logPath; + $c->stash->{finished} = $finished; + $c->forward('Hydra::View::NixLog'); + return; } elsif ($mode eq "tail-reload") { @@ -201,12 +201,18 @@ sub checkPath { sub download : Chained('buildChain') PathPart { - my ($self, $c, $productnr, @path) = @_; + my ($self, $c, $productRef, @path) = @_; - $productnr = 1 if !defined $productnr; + $productRef = 1 if !defined $productRef; - my $product = $c->stash->{build}->buildproducts->find({productnr => $productnr}); - notFound($c, "Build doesn't have a product #$productnr.") if !defined $product; + my $product; + if ($productRef =~ /^[0-9]+$/) { + $product = $c->stash->{build}->buildproducts->find({productnr => $productRef}); + } else { + $product = $c->stash->{build}->buildproducts->find({name => $productRef}); + @path = ($productRef, @path); + } + notFound($c, "Build doesn't have a product $productRef.") if !defined $product; notFound($c, "Build product " . $product->path . " has disappeared.") unless -e $product->path; @@ -473,6 +479,23 @@ sub keep : Chained('buildChain') PathPart Args(1) { } +sub bump : Chained('buildChain') PathPart('bump') { + my ($self, $c, $x) = @_; + + my $build = $c->stash->{build}; + + requireProjectOwner($c, $build->project); # FIXME: require admin? + + $c->model('DB')->schema->txn_do(sub { + $build->update({globalpriority => time()}); + }); + + $c->flash->{successMsg} = "Build has been bumped to the front of the queue."; + + $c->res->redirect($c->uri_for($self->action_for("build"), $c->req->captures)); +} + + sub add_to_release : Chained('buildChain') PathPart('add-to-release') Args(0) { my ($self, $c) = @_; diff --git a/src/lib/Hydra/Controller/Job.pm b/src/lib/Hydra/Controller/Job.pm index 274d9604..475eca37 100644 --- a/src/lib/Hydra/Controller/Job.pm +++ b/src/lib/Hydra/Controller/Job.pm @@ -77,6 +77,9 @@ sub overview : Chained('job') PathPart('') Args(0) { , jobset => $c->stash->{jobset}->name , job => $c->stash->{job}->name })->count == 1 if $c->user_exists; + + $c->stash->{metrics} = [ $job->buildmetrics->search( + { }, { select => ["name"], distinct => 1, order_by => "timestamp desc", }) ]; } @@ -110,6 +113,20 @@ sub output_sizes : Chained('job') PathPart('output-sizes') Args(0) { } +sub metric : Chained('job') PathPart('metric') Args(1) { + my ($self, $c, $metricName) = @_; + + $c->stash->{template} = 'metric.tt'; + $c->stash->{metricName} = $metricName; + + my @res = $c->stash->{job}->buildmetrics->search( + { name => $metricName }, + { order_by => "timestamp", columns => [ "build", "name", "timestamp", "value", "unit" ] }); + + $self->status_ok($c, entity => [ map { { id => $_->get_column("build"), timestamp => $_ ->timestamp, value => $_->value, unit => $_->unit } } @res ]); +} + + # Hydra::Base::Controller::ListBuilds needs this. sub get_builds : Chained('job') PathPart('') CaptureArgs(0) { my ($self, $c) = @_; diff --git a/src/lib/Hydra/Controller/Jobset.pm b/src/lib/Hydra/Controller/Jobset.pm index 529a456b..1d1cce63 100644 --- a/src/lib/Hydra/Controller/Jobset.pm +++ b/src/lib/Hydra/Controller/Jobset.pm @@ -162,7 +162,7 @@ sub edit : Chained('jobsetChain') PathPart Args(0) { requireProjectOwner($c, $c->stash->{project}); $c->stash->{template} = 'edit-jobset.tt'; - $c->stash->{edit} = 1; + $c->stash->{edit} = !defined $c->stash->{params}->{cloneJobset}; $c->stash->{cloneJobset} = defined $c->stash->{params}->{cloneJobset}; $c->stash->{totalShares} = getTotalShares($c->model('DB')->schema); } @@ -220,6 +220,9 @@ sub updateJobset { my $enabled = int($c->stash->{params}->{enabled}); die if $enabled < 0 || $enabled > 2; + my $shares = int($c->stash->{params}->{schedulingshares} // 1); + error($c, "The number of scheduling shares must be positive.") if $shares <= 0; + $jobset->update( { name => $jobsetName , description => trim($c->stash->{params}->{"description"}) @@ -232,7 +235,7 @@ sub updateJobset { , keepnr => int(trim($c->stash->{params}->{keepnr})) , checkinterval => int(trim($c->stash->{params}->{checkinterval})) , triggertime => $enabled ? $jobset->triggertime // time() : undef - , schedulingshares => int($c->stash->{params}->{schedulingshares}) + , schedulingshares => $shares }); $jobset->project->jobsetrenames->search({ from_ => $jobsetName })->delete; diff --git a/src/lib/Hydra/Controller/JobsetEval.pm b/src/lib/Hydra/Controller/JobsetEval.pm index c7ea4448..c0b5b0c9 100644 --- a/src/lib/Hydra/Controller/JobsetEval.pm +++ b/src/lib/Hydra/Controller/JobsetEval.pm @@ -180,13 +180,26 @@ sub cancel : Chained('eval') PathPart('cancel') Args(0) { sub restart_aborted : Chained('eval') PathPart('restart-aborted') Args(0) { my ($self, $c) = @_; requireProjectOwner($c, $c->stash->{eval}->project); - my $builds = $c->stash->{eval}->builds->search({ finished => 1, buildstatus => { -in => [3, 4] } }); + my $builds = $c->stash->{eval}->builds->search({ finished => 1, buildstatus => { -in => [3, 4, 9] } }); my $n = restartBuilds($c->model('DB')->schema, $builds); $c->flash->{successMsg} = "$n builds have been restarted."; $c->res->redirect($c->uri_for($c->controller('JobsetEval')->action_for('view'), $c->req->captures)); } +sub bump : Chained('eval') PathPart('bump') Args(0) { + my ($self, $c) = @_; + requireProjectOwner($c, $c->stash->{eval}->project); # FIXME: require admin? + my $builds = $c->stash->{eval}->builds->search({ finished => 0 }); + my $n = $builds->count(); + $c->model('DB')->schema->txn_do(sub { + $builds->update({globalpriority => time()}); + }); + $c->flash->{successMsg} = "$n builds have been bumped to the front of the queue."; + $c->res->redirect($c->uri_for($c->controller('JobsetEval')->action_for('view'), $c->req->captures)); +} + + # Hydra::Base::Controller::NixChannel needs this. sub nix : Chained('eval') PathPart('channel') CaptureArgs(0) { my ($self, $c) = @_; diff --git a/src/lib/Hydra/Controller/Root.pm b/src/lib/Hydra/Controller/Root.pm index c4765065..e4fb4a74 100644 --- a/src/lib/Hydra/Controller/Root.pm +++ b/src/lib/Hydra/Controller/Root.pm @@ -30,7 +30,7 @@ sub begin :Private { $c->stash->{version} = $ENV{"HYDRA_RELEASE"} || ""; $c->stash->{nixVersion} = $ENV{"NIX_RELEASE"} || ""; $c->stash->{curTime} = time; - $c->stash->{logo} = ($c->config->{hydra_logo} // $ENV{"HYDRA_LOGO"}) ? "/logo" : ""; + $c->stash->{logo} = defined $c->config->{hydra_logo} ? "/logo" : ""; $c->stash->{tracker} = $ENV{"HYDRA_TRACKER"}; $c->stash->{flashMsg} = $c->flash->{flashMsg}; $c->stash->{successMsg} = $c->flash->{successMsg}; @@ -88,7 +88,7 @@ sub queue_GET { $c->stash->{flashMsg} //= $c->flash->{buildMsg}; $self->status_ok( $c, - entity => [$c->model('DB::Builds')->search({finished => 0}, { order_by => ["id"]})] + entity => [$c->model('DB::Builds')->search({finished => 0}, { order_by => ["globalpriority desc", "id"]})] ); } @@ -111,14 +111,6 @@ sub machines :Local Args(0) { # Add entry for localhost. ${$machines}{''} //= {}; - # Get the last finished build step for each machine. - foreach my $m (keys %{$machines}) { - my $idle = $c->model('DB::BuildSteps')->find( - { machine => "$m", stoptime => { '!=', undef } }, - { order_by => 'stoptime desc', rows => 1 }); - ${$machines}{$m}{'idle'} = $idle ? $idle->stoptime : 0; - } - $c->stash->{machines} = $machines; $c->stash->{steps} = [ $c->model('DB::BuildSteps')->search( { finished => 0, 'me.busy' => 1, 'build.busy' => 1, }, @@ -270,7 +262,7 @@ sub narinfo :LocalRegex('^([a-z0-9]+).narinfo$') :Args(0) { sub logo :Local { my ($self, $c) = @_; - my $path = $c->config->{hydra_logo} // $ENV{"HYDRA_LOGO"} // die("Logo not set!"); + my $path = $c->config->{hydra_logo} // die("Logo not set!"); $c->serve_static_file($path); } @@ -293,6 +285,30 @@ sub evals :Local Args(0) { } +sub steps :Local Args(0) { + my ($self, $c) = @_; + + $c->stash->{template} = 'steps.tt'; + + my $page = int($c->req->param('page') || "1") || 1; + + my $resultsPerPage = 20; + + $c->stash->{page} = $page; + $c->stash->{resultsPerPage} = $resultsPerPage; + $c->stash->{steps} = [ $c->model('DB::BuildSteps')->search( + { starttime => { '!=', undef }, + stoptime => { '!=', undef } + }, + { order_by => [ "stoptime desc" ], + rows => $resultsPerPage, + offset => ($page - 1) * $resultsPerPage + }) ]; + + $c->stash->{total} = approxTableSize($c, "IndexBuildStepsOnStopTime"); +} + + sub search :Local Args(0) { my ($self, $c) = @_; $c->stash->{template} = 'search.tt'; @@ -340,9 +356,9 @@ sub search :Local Args(0) { $c->stash->{buildsdrv} = [ $c->model('DB::Builds')->search( { "drvpath" => trim($query) }, { order_by => ["id desc"] } ) ]; - } + sub log :Local :Args(1) { my ($self, $c, $path) = @_; @@ -352,8 +368,8 @@ sub log :Local :Args(1) { my $logPath = findLog($c, $path, @outpaths); notFound($c, "The build log of $path is not available.") unless defined $logPath; - $c->stash->{'plain'} = { data => (scalar logContents($logPath)) || " " }; - $c->forward('Hydra::View::Plain'); + $c->stash->{logPath} = $logPath; + $c->forward('Hydra::View::NixLog'); } diff --git a/src/lib/Hydra/Helper/AddBuilds.pm b/src/lib/Hydra/Helper/AddBuilds.pm index cbb8f2fd..773b850d 100644 --- a/src/lib/Hydra/Helper/AddBuilds.pm +++ b/src/lib/Hydra/Helper/AddBuilds.pm @@ -22,20 +22,10 @@ use Hydra::Helper::CatalystUtils; our @ISA = qw(Exporter); our @EXPORT = qw( fetchInput evalJobs checkBuild inputsToArgs - getReleaseName addBuildProducts restartBuild - getPrevJobsetEval + restartBuild getPrevJobsetEval ); -sub getReleaseName { - my ($outPath) = @_; - return undef unless -f "$outPath/nix-support/hydra-release-name"; - my $releaseName = read_file("$outPath/nix-support/hydra-release-name"); - chomp $releaseName; - return $releaseName; -} - - sub parseJobName { # Parse a job specification of the form `:: # [attrs]'. The project, jobset and attrs may be omitted. The @@ -299,7 +289,7 @@ sub inputsToArgs { my ($inputInfo, $exprType) = @_; my @res = (); - foreach my $input (keys %{$inputInfo}) { + foreach my $input (sort keys %{$inputInfo}) { push @res, "-I", "$input=$inputInfo->{$input}->[0]->{storePath}" if scalar @{$inputInfo->{$input}} == 1 && defined $inputInfo->{$input}->[0]->{storePath}; @@ -367,80 +357,6 @@ sub evalJobs { } -sub addBuildProducts { - my ($db, $build) = @_; - - my $productnr = 1; - my $explicitProducts = 0; - my $storeDir = $Nix::Config::storeDir . "/"; - - foreach my $output ($build->buildoutputs->all) { - my $outPath = $output->path; - if (-e "$outPath/nix-support/hydra-build-products") { - $explicitProducts = 1; - - open LIST, "$outPath/nix-support/hydra-build-products" or die; - while () { - /^([\w\-]+)\s+([\w\-]+)\s+("[^"]*"|\S+)(\s+(\S+))?$/ or next; - my $type = $1; - my $subtype = $2 eq "none" ? "" : $2; - my $path = substr($3, 0, 1) eq "\"" ? substr($3, 1, -1) : $3; - my $defaultPath = $5; - - # Ensure that the path exists and points into the Nix store. - next unless File::Spec->file_name_is_absolute($path); - $path = pathIsInsidePrefix($path, $Nix::Config::storeDir); - next unless defined $path; - next unless -e $path; - - # FIXME: check that the path is in the input closure - # of the build? - - my $fileSize, my $sha1, my $sha256; - - if (-f $path) { - my $st = stat($path) or die "cannot stat $path: $!"; - $fileSize = $st->size; - $sha1 = hashFile("sha1", 0, $path); - $sha256 = hashFile("sha256", 0, $path); - } - - my $name = $path eq $outPath ? "" : basename $path; - - $db->resultset('BuildProducts')->create( - { build => $build->id - , productnr => $productnr++ - , type => $type - , subtype => $subtype - , path => $path - , filesize => $fileSize - , sha1hash => $sha1 - , sha256hash => $sha256 - , name => $name - , defaultpath => $defaultPath - }); - } - close LIST; - } - } - - return if $explicitProducts; - - foreach my $output ($build->buildoutputs->all) { - my $outPath = $output->path; - next unless -d $outPath; - $db->resultset('BuildProducts')->create( - { build => $build->id - , productnr => $productnr++ - , type => "nix-build" - , subtype => $output->name eq "out" ? "" : $output->name - , path => $outPath - , name => $build->nixname - }); - } -} - - # Return the most recent evaluation of the given jobset (that # optionally had new builds), or undefined if no such evaluation # exists. @@ -513,40 +429,6 @@ sub checkBuild { my $time = time(); - # Are the outputs already in the Nix store? Then add a cached - # build. - my %extraFlags; - my $allValid = 1; - my $buildStatus; - my $releaseName; - foreach my $name (@outputNames) { - my $path = $buildInfo->{outputs}->{$name}; - if (isValidPath($path)) { - if (-f "$path/nix-support/failed") { - $buildStatus = 6; - } else { - $buildStatus //= 0; - } - $releaseName //= getReleaseName($path); - } else { - $allValid = 0; - last; - } - } - - if ($allValid) { - %extraFlags = - ( finished => 1 - , iscachedbuild => 1 - , buildstatus => $buildStatus - , starttime => $time - , stoptime => $time - , releasename => $releaseName - ); - } else { - %extraFlags = ( finished => 0 ); - } - # Add the build to the database. $build = $job->builds->create( { timestamp => $time @@ -562,10 +444,10 @@ sub checkBuild { , nixexprinput => $jobset->nixexprinput , nixexprpath => $jobset->nixexprpath , priority => $buildInfo->{schedulingPriority} + , finished => 0 , busy => 0 , locker => "" , iscurrent => 1 - , %extraFlags }); $build->buildoutputs->create({ name => $_, path => $buildInfo->{outputs}->{$_} }) @@ -574,13 +456,7 @@ sub checkBuild { $buildMap->{$build->id} = { id => $build->id, jobName => $jobName, new => 1, drvPath => $drvPath }; $$jobOutPathMap{$jobName . "\t" . $firstOutputPath} = $build->id; - if ($build->iscachedbuild) { - #print STDERR " marked as cached build ", $build->id, "\n"; - addBuildProducts($db, $build); - notifyBuildFinished($plugins, $build, []); - } else { - print STDERR "added build ${\$build->id} (${\$jobset->project->name}:${\$jobset->name}:$jobName)\n"; - } + print STDERR "added build ${\$build->id} (${\$jobset->project->name}:${\$jobset->name}:$jobName)\n"; }); return $build; diff --git a/src/lib/Hydra/Helper/CatalystUtils.pm b/src/lib/Hydra/Helper/CatalystUtils.pm index 35e15927..d31b3f53 100644 --- a/src/lib/Hydra/Helper/CatalystUtils.pm +++ b/src/lib/Hydra/Helper/CatalystUtils.pm @@ -23,6 +23,7 @@ our @EXPORT = qw( showStatus getResponsibleAuthors setCacheHeaders + approxTableSize ); @@ -296,4 +297,11 @@ sub setCacheHeaders { } +sub approxTableSize { + my ($c, $name) = @_; + return $c->model('DB')->schema->storage->dbh->selectrow_hashref( + "select reltuples::int from pg_class where relname = lower(?)", { }, $name)->{"reltuples"}; +} + + 1; diff --git a/src/lib/Hydra/Helper/Email.pm b/src/lib/Hydra/Helper/Email.pm index 47fc5629..e628fca3 100644 --- a/src/lib/Hydra/Helper/Email.pm +++ b/src/lib/Hydra/Helper/Email.pm @@ -13,7 +13,7 @@ sub sendEmail { my ($config, $to, $subject, $body, $extraHeaders) = @_; my $url = getBaseUrl($config); - my $sender = $config->{'notification_sender'} // (($ENV{'USER'} // "hydra") . "@" . $url); + my $sender = $config->{'notification_sender'} // (($ENV{'USER'} // "hydra") . "@" . hostname_long); my @headers = ( To => $to, diff --git a/src/lib/Hydra/Helper/Nix.pm b/src/lib/Hydra/Helper/Nix.pm index 16d498fb..3a898745 100644 --- a/src/lib/Hydra/Helper/Nix.pm +++ b/src/lib/Hydra/Helper/Nix.pm @@ -133,8 +133,9 @@ sub getDrvLogPath { my $base = basename $drvPath; my $bucketed = substr($base, 0, 2) . "/" . substr($base, 2); my $fn = ($ENV{NIX_LOG_DIR} || "/nix/var/log/nix") . "/drvs/"; - for ($fn . $bucketed . ".bz2", $fn . $bucketed, $fn . $base . ".bz2", $fn . $base) { - return $_ if (-f $_); + my $fn2 = Hydra::Model::DB::getHydraPath . "/build-logs/"; + for ($fn2 . $bucketed, $fn2 . $bucketed . ".bz2", $fn . $bucketed . ".bz2", $fn . $bucketed, $fn . $base . ".bz2", $fn . $base) { + return $_ if -f $_; } return undef; } @@ -423,7 +424,7 @@ sub getTotalShares { sub cancelBuilds($$) { my ($db, $builds) = @_; return txn_do($db, sub { - $builds = $builds->search({ finished => 0, busy => 0 }); + $builds = $builds->search({ finished => 0 }); my $n = $builds->count; my $time = time(); $builds->update( @@ -448,7 +449,7 @@ sub restartBuilds($$) { foreach my $build ($builds->all) { next if !isValidPath($build->drvpath); - push @paths, $build->drvpath; + push @paths, $_->path foreach $build->buildoutputs->all; push @buildIds, $build->id; registerRoot $build->drvpath; } @@ -464,9 +465,10 @@ sub restartBuilds($$) { # !!! Should do this in a trigger. $db->resultset('JobsetEvals')->search({ build => \@buildIds }, { join => 'buildIds' })->update({ nrsucceeded => undef }); - # Clear Nix's negative failure cache. + # Clear the failed paths cache. # FIXME: Add this to the API. - system("nix-store", "--clear-failed-paths", @paths); + # FIXME: clear the dependencies? + $db->resultset('FailedPaths')->search({ path => [ @paths ]})->delete; }); return scalar(@buildIds); diff --git a/src/lib/Hydra/Plugin/GitInput.pm b/src/lib/Hydra/Plugin/GitInput.pm index 66aaf625..4d75538e 100644 --- a/src/lib/Hydra/Plugin/GitInput.pm +++ b/src/lib/Hydra/Plugin/GitInput.pm @@ -163,7 +163,7 @@ sub getCommits { my ($uri, $branch, $deepClone) = _parseValue($value); - my $clonePath = $self->_cloneRepo($uri, $branch, $deepClone); + my $clonePath = getSCMCacheDir . "/git/" . sha256_hex($uri); my $out = grab(cmd => ["git", "log", "--pretty=format:%H%x09%an%x09%ae%x09%at", "$rev1..$rev2"], dir => $clonePath); diff --git a/src/lib/Hydra/Schema/BuildMetrics.pm b/src/lib/Hydra/Schema/BuildMetrics.pm new file mode 100644 index 00000000..58bbed94 --- /dev/null +++ b/src/lib/Hydra/Schema/BuildMetrics.pm @@ -0,0 +1,187 @@ +use utf8; +package Hydra::Schema::BuildMetrics; + +# Created by DBIx::Class::Schema::Loader +# DO NOT MODIFY THE FIRST PART OF THIS FILE + +=head1 NAME + +Hydra::Schema::BuildMetrics + +=cut + +use strict; +use warnings; + +use base 'DBIx::Class::Core'; + +=head1 COMPONENTS LOADED + +=over 4 + +=item * L + +=back + +=cut + +__PACKAGE__->load_components("+Hydra::Component::ToJSON"); + +=head1 TABLE: C + +=cut + +__PACKAGE__->table("BuildMetrics"); + +=head1 ACCESSORS + +=head2 build + + data_type: 'integer' + is_foreign_key: 1 + is_nullable: 0 + +=head2 name + + data_type: 'text' + is_nullable: 0 + +=head2 unit + + data_type: 'text' + is_nullable: 1 + +=head2 value + + data_type: 'double precision' + is_nullable: 0 + +=head2 project + + data_type: 'text' + is_foreign_key: 1 + is_nullable: 0 + +=head2 jobset + + data_type: 'text' + is_foreign_key: 1 + is_nullable: 0 + +=head2 job + + data_type: 'text' + is_foreign_key: 1 + is_nullable: 0 + +=head2 timestamp + + data_type: 'integer' + is_nullable: 0 + +=cut + +__PACKAGE__->add_columns( + "build", + { data_type => "integer", is_foreign_key => 1, is_nullable => 0 }, + "name", + { data_type => "text", is_nullable => 0 }, + "unit", + { data_type => "text", is_nullable => 1 }, + "value", + { data_type => "double precision", is_nullable => 0 }, + "project", + { data_type => "text", is_foreign_key => 1, is_nullable => 0 }, + "jobset", + { data_type => "text", is_foreign_key => 1, is_nullable => 0 }, + "job", + { data_type => "text", is_foreign_key => 1, is_nullable => 0 }, + "timestamp", + { data_type => "integer", is_nullable => 0 }, +); + +=head1 PRIMARY KEY + +=over 4 + +=item * L + +=item * L + +=back + +=cut + +__PACKAGE__->set_primary_key("build", "name"); + +=head1 RELATIONS + +=head2 build + +Type: belongs_to + +Related object: L + +=cut + +__PACKAGE__->belongs_to( + "build", + "Hydra::Schema::Builds", + { id => "build" }, + { is_deferrable => 0, on_delete => "CASCADE", on_update => "NO ACTION" }, +); + +=head2 job + +Type: belongs_to + +Related object: L + +=cut + +__PACKAGE__->belongs_to( + "job", + "Hydra::Schema::Jobs", + { jobset => "jobset", name => "job", project => "project" }, + { is_deferrable => 0, on_delete => "NO ACTION", on_update => "CASCADE" }, +); + +=head2 jobset + +Type: belongs_to + +Related object: L + +=cut + +__PACKAGE__->belongs_to( + "jobset", + "Hydra::Schema::Jobsets", + { name => "jobset", project => "project" }, + { is_deferrable => 0, on_delete => "NO ACTION", on_update => "CASCADE" }, +); + +=head2 project + +Type: belongs_to + +Related object: L + +=cut + +__PACKAGE__->belongs_to( + "project", + "Hydra::Schema::Projects", + { name => "project" }, + { is_deferrable => 0, on_delete => "NO ACTION", on_update => "CASCADE" }, +); + + +# Created by DBIx::Class::Schema::Loader v0.07043 @ 2015-07-30 16:52:20 +# DO NOT MODIFY THIS OR ANYTHING ABOVE! md5sum:qoPm5/le+sVHigW4Dmum2Q + +sub json_hint { + return { columns => ['value', 'unit'] }; +} + +1; diff --git a/src/lib/Hydra/Schema/Builds.pm b/src/lib/Hydra/Schema/Builds.pm index 852c6c32..b87b7958 100644 --- a/src/lib/Hydra/Schema/Builds.pm +++ b/src/lib/Hydra/Schema/Builds.pm @@ -138,6 +138,12 @@ __PACKAGE__->table("Builds"); default_value: 0 is_nullable: 0 +=head2 globalpriority + + data_type: 'integer' + default_value: 0 + is_nullable: 0 + =head2 busy data_type: 'integer' @@ -241,6 +247,8 @@ __PACKAGE__->add_columns( { data_type => "text", is_nullable => 1 }, "priority", { data_type => "integer", default_value => 0, is_nullable => 0 }, + "globalpriority", + { data_type => "integer", default_value => 0, is_nullable => 0 }, "busy", { data_type => "integer", default_value => 0, is_nullable => 0 }, "locker", @@ -341,6 +349,21 @@ __PACKAGE__->has_many( undef, ); +=head2 buildmetrics + +Type: has_many + +Related object: L + +=cut + +__PACKAGE__->has_many( + "buildmetrics", + "Hydra::Schema::BuildMetrics", + { "foreign.build" => "self.id" }, + undef, +); + =head2 buildoutputs Type: has_many @@ -401,6 +424,21 @@ __PACKAGE__->has_many( undef, ); +=head2 buildsteps_propagatedfroms + +Type: has_many + +Related object: L + +=cut + +__PACKAGE__->has_many( + "buildsteps_propagatedfroms", + "Hydra::Schema::BuildSteps", + { "foreign.propagatedfrom" => "self.id" }, + undef, +); + =head2 job Type: belongs_to @@ -509,19 +547,19 @@ __PACKAGE__->many_to_many( Type: many_to_many -Composing rels: L -> constituent +Composing rels: L -> constituent =cut __PACKAGE__->many_to_many( "constituents", - "aggregateconstituents_constituents", + "aggregateconstituents_aggregates", "constituent", ); -# Created by DBIx::Class::Schema::Loader v0.07033 @ 2014-09-30 15:38:03 -# DO NOT MODIFY THIS OR ANYTHING ABOVE! md5sum:kMPje7yi/yDqxGRQcC2I/Q +# Created by DBIx::Class::Schema::Loader v0.07043 @ 2015-08-10 15:10:41 +# DO NOT MODIFY THIS OR ANYTHING ABOVE! md5sum:rjifgnPtjY96MaQ7eiGzaA __PACKAGE__->has_many( "dependents", @@ -615,6 +653,7 @@ my %hint = ( buildoutputs => 'name', buildinputs_builds => 'name', buildproducts => 'productnr', + buildmetrics => 'name', } ); diff --git a/src/lib/Hydra/Schema/FailedPaths.pm b/src/lib/Hydra/Schema/FailedPaths.pm new file mode 100644 index 00000000..082b989d --- /dev/null +++ b/src/lib/Hydra/Schema/FailedPaths.pm @@ -0,0 +1,65 @@ +use utf8; +package Hydra::Schema::FailedPaths; + +# Created by DBIx::Class::Schema::Loader +# DO NOT MODIFY THE FIRST PART OF THIS FILE + +=head1 NAME + +Hydra::Schema::FailedPaths + +=cut + +use strict; +use warnings; + +use base 'DBIx::Class::Core'; + +=head1 COMPONENTS LOADED + +=over 4 + +=item * L + +=back + +=cut + +__PACKAGE__->load_components("+Hydra::Component::ToJSON"); + +=head1 TABLE: C + +=cut + +__PACKAGE__->table("FailedPaths"); + +=head1 ACCESSORS + +=head2 path + + data_type: 'text' + is_nullable: 0 + +=cut + +__PACKAGE__->add_columns("path", { data_type => "text", is_nullable => 0 }); + +=head1 PRIMARY KEY + +=over 4 + +=item * L + +=back + +=cut + +__PACKAGE__->set_primary_key("path"); + + +# Created by DBIx::Class::Schema::Loader v0.07033 @ 2015-06-10 14:48:16 +# DO NOT MODIFY THIS OR ANYTHING ABOVE! md5sum:WFgjfjH+szE6Ntcicmaflw + + +# You can replace this text with custom code or comments, and it will be preserved on regeneration +1; diff --git a/src/lib/Hydra/Schema/Jobs.pm b/src/lib/Hydra/Schema/Jobs.pm index 883c1bee..cd89ed3d 100644 --- a/src/lib/Hydra/Schema/Jobs.pm +++ b/src/lib/Hydra/Schema/Jobs.pm @@ -81,6 +81,25 @@ __PACKAGE__->set_primary_key("project", "jobset", "name"); =head1 RELATIONS +=head2 buildmetrics + +Type: has_many + +Related object: L + +=cut + +__PACKAGE__->has_many( + "buildmetrics", + "Hydra::Schema::BuildMetrics", + { + "foreign.job" => "self.name", + "foreign.jobset" => "self.jobset", + "foreign.project" => "self.project", + }, + undef, +); + =head2 builds Type: has_many @@ -150,7 +169,7 @@ __PACKAGE__->has_many( ); -# Created by DBIx::Class::Schema::Loader v0.07033 @ 2014-09-29 19:41:42 -# DO NOT MODIFY THIS OR ANYTHING ABOVE! md5sum:lnZSd0gDXgLk8WQeAFqByA +# Created by DBIx::Class::Schema::Loader v0.07043 @ 2015-07-30 16:52:20 +# DO NOT MODIFY THIS OR ANYTHING ABOVE! md5sum:vDAo9bzLca+QWfhOb9OLMg 1; diff --git a/src/lib/Hydra/Schema/Jobsets.pm b/src/lib/Hydra/Schema/Jobsets.pm index f304f770..682b3ab0 100644 --- a/src/lib/Hydra/Schema/Jobsets.pm +++ b/src/lib/Hydra/Schema/Jobsets.pm @@ -184,6 +184,24 @@ __PACKAGE__->set_primary_key("project", "name"); =head1 RELATIONS +=head2 buildmetrics + +Type: has_many + +Related object: L + +=cut + +__PACKAGE__->has_many( + "buildmetrics", + "Hydra::Schema::BuildMetrics", + { + "foreign.jobset" => "self.name", + "foreign.project" => "self.project", + }, + undef, +); + =head2 builds Type: has_many @@ -320,8 +338,8 @@ __PACKAGE__->has_many( ); -# Created by DBIx::Class::Schema::Loader v0.07033 @ 2014-04-23 23:13:51 -# DO NOT MODIFY THIS OR ANYTHING ABOVE! md5sum:CO0aE+jrjB+UrwGRzWZLlw +# Created by DBIx::Class::Schema::Loader v0.07043 @ 2015-07-30 16:52:20 +# DO NOT MODIFY THIS OR ANYTHING ABOVE! md5sum:Coci9FdBAvUO9T3st2NEqA my %hint = ( columns => [ diff --git a/src/lib/Hydra/Schema/Projects.pm b/src/lib/Hydra/Schema/Projects.pm index 9bf9dde3..e04b1f8e 100644 --- a/src/lib/Hydra/Schema/Projects.pm +++ b/src/lib/Hydra/Schema/Projects.pm @@ -106,6 +106,21 @@ __PACKAGE__->set_primary_key("name"); =head1 RELATIONS +=head2 buildmetrics + +Type: has_many + +Related object: L + +=cut + +__PACKAGE__->has_many( + "buildmetrics", + "Hydra::Schema::BuildMetrics", + { "foreign.project" => "self.name" }, + undef, +); + =head2 builds Type: has_many @@ -267,8 +282,8 @@ Composing rels: L -> username __PACKAGE__->many_to_many("usernames", "projectmembers", "username"); -# Created by DBIx::Class::Schema::Loader v0.07033 @ 2014-04-23 23:13:08 -# DO NOT MODIFY THIS OR ANYTHING ABOVE! md5sum:fkd9ruEoVSBGIktmAj4u4g +# Created by DBIx::Class::Schema::Loader v0.07043 @ 2015-07-30 16:52:20 +# DO NOT MODIFY THIS OR ANYTHING ABOVE! md5sum:67kWIE0IGmEJTvOIATAKaw my %hint = ( columns => [ diff --git a/src/lib/Hydra/Schema/SystemStatus.pm b/src/lib/Hydra/Schema/SystemStatus.pm new file mode 100644 index 00000000..7c99e780 --- /dev/null +++ b/src/lib/Hydra/Schema/SystemStatus.pm @@ -0,0 +1,75 @@ +use utf8; +package Hydra::Schema::SystemStatus; + +# Created by DBIx::Class::Schema::Loader +# DO NOT MODIFY THE FIRST PART OF THIS FILE + +=head1 NAME + +Hydra::Schema::SystemStatus + +=cut + +use strict; +use warnings; + +use base 'DBIx::Class::Core'; + +=head1 COMPONENTS LOADED + +=over 4 + +=item * L + +=back + +=cut + +__PACKAGE__->load_components("+Hydra::Component::ToJSON"); + +=head1 TABLE: C + +=cut + +__PACKAGE__->table("SystemStatus"); + +=head1 ACCESSORS + +=head2 what + + data_type: 'text' + is_nullable: 0 + +=head2 status + + data_type: 'json' + is_nullable: 0 + +=cut + +__PACKAGE__->add_columns( + "what", + { data_type => "text", is_nullable => 0 }, + "status", + { data_type => "json", is_nullable => 0 }, +); + +=head1 PRIMARY KEY + +=over 4 + +=item * L + +=back + +=cut + +__PACKAGE__->set_primary_key("what"); + + +# Created by DBIx::Class::Schema::Loader v0.07043 @ 2015-07-30 16:01:22 +# DO NOT MODIFY THIS OR ANYTHING ABOVE! md5sum:JCYi4+HwM22iucdFkhBjMg + + +# You can replace this text with custom code or comments, and it will be preserved on regeneration +1; diff --git a/src/lib/Hydra/View/NixLog.pm b/src/lib/Hydra/View/NixLog.pm new file mode 100644 index 00000000..fe10755c --- /dev/null +++ b/src/lib/Hydra/View/NixLog.pm @@ -0,0 +1,30 @@ +package Hydra::View::NixLog; + +use strict; +use base qw/Catalyst::View/; +use Hydra::Helper::CatalystUtils; + +sub process { + my ($self, $c) = @_; + + my $logPath = $c->stash->{logPath}; + + $c->response->content_type('text/plain'); + + my $fh = new IO::Handle; + + if ($logPath =~ /\.bz2$/) { + open $fh, "bzip2 -dc < '$logPath' |" or die; + } else { + open $fh, "<$logPath" or die; + } + binmode($fh); + + setCacheHeaders($c, 365 * 24 * 60 * 60) if $c->stash->{finished}; + + $c->response->body($fh); + + return 1; +} + +1; diff --git a/src/lib/Hydra/View/TT.pm b/src/lib/Hydra/View/TT.pm index 2f9d4201..be3cf493 100644 --- a/src/lib/Hydra/View/TT.pm +++ b/src/lib/Hydra/View/TT.pm @@ -9,7 +9,7 @@ __PACKAGE__->config( ENCODING => 'utf-8', PRE_CHOMP => 1, POST_CHOMP => 1, - expose_methods => [qw/buildLogExists buildStepLogExists jobExists/]); + expose_methods => [qw/buildLogExists buildStepLogExists jobExists stripSSHUser/]); sub buildLogExists { my ($self, $c, $build) = @_; @@ -23,6 +23,16 @@ sub buildStepLogExists { return defined findLog($c, $step->drvpath, @outPaths); } + +sub stripSSHUser { + my ($self, $c, $name) = @_; + if ($name =~ /^.*@(.*)$/) { + return $1; + } else { + return $name; + } +} + # Check whether the given job is a member of the most recent jobset # evaluation. sub jobExists { diff --git a/src/root/build.tt b/src/root/build.tt index 2ddf5fd7..ec1fe10a 100644 --- a/src/root/build.tt +++ b/src/root/build.tt @@ -18,9 +18,10 @@ NrWhatDurationMachineStatus - [% FOREACH step IN build.buildsteps %] + [% FOREACH step IN build.buildsteps.reverse %] [% IF ( type == "All" ) || ( type == "Failed" && step.status != 0 ) || ( type == "Running" && step.busy == 1 ) %] - [% has_log = buildStepLogExists(step); + [% has_log = seen.${step.drvpath} ? 0 : buildStepLogExists(step); + seen.${step.drvpath} = 1; log = c.uri_for('/build' build.id 'nixlog' step.stepnr); %] [% step.stepnr %] @@ -33,27 +34,35 @@ [% IF step.busy == 0; - INCLUDE renderDuration duration = step.stoptime - step.starttime; + IF step.stoptime; + INCLUDE renderDuration duration = step.stoptime - step.starttime; + ELSE; + %]n/a[% + END; ELSIF build.finished; INCLUDE renderDuration duration = build.stoptime - step.starttime; ELSE; INCLUDE renderDuration duration = curTime - step.starttime; END %] - [% step.machine.split('@').1 || step.machine %] + [% IF step.busy == 1 || step.status == 0 || step.status == 1 || step.status == 4 || step.status == 7; INCLUDE renderMachineName machine=step.machine; ELSE; "n/a"; END %] [% IF step.busy == 1 %] Building [% ELSIF step.status == 0 %] Succeeded [% ELSIF step.status == 4 %] - Aborted + Aborted[% IF step.errormsg %]: [% HTML.escape(step.errormsg); END %] [% ELSIF step.status == 7 %] Timed out [% ELSIF step.status == 8 %] Cached failure - [% ELSE %] + [% ELSIF step.status == 9 %] + Unsupported system type + [% ELSIF step.errormsg %] Failed: [% HTML.escape(step.errormsg) %] + [% ELSE %] + Failed [% END %] [%%] [%+ IF has_log; INCLUDE renderLogLinks url=log inRow=1; END %] [%+ IF step.propagatedfrom; %](propagated from [% INCLUDE renderBuildIdLink id=step.propagatedfrom.get_column('id') %])[% END %] @@ -87,6 +96,7 @@
  • Restart
  • [% ELSE %]
  • Cancel
  • +
  • Bump up
  • [% END %] [% IF available && project.releases %] [% INCLUDE menuItem @@ -97,7 +107,7 @@ [% END %] - +
  • Summary
  • [% IF isAggregate %]
  • Constituents
  • [% END %]
  • Details
  • @@ -178,13 +188,17 @@ [% IF cachedBuild; INCLUDE renderFullBuildLink build=cachedBuild; ELSE %]unknown[% END %] [% END %] - [% IF !isAggregate && build.finished %] - - Duration: - [% actualBuild = build.iscachedbuild ? cachedBuild : build; - INCLUDE renderDuration duration = actualBuild.stoptime - actualBuild.starttime %]; - finished at [% INCLUDE renderDateTime timestamp = actualBuild.stoptime %] - + [% IF !isAggregate && build.finished; actualBuild = build.iscachedbuild ? cachedBuild : build %] + [% IF actualBuild %] + + Duration: + [% INCLUDE renderDuration duration = actualBuild.stoptime - actualBuild.starttime %] + + [% END %] + + Finished at: + [% INCLUDE renderDateTime timestamp = build.stoptime; %] + [% END %] [% IF !isAggregate && buildLogExists(build) %] @@ -267,11 +281,6 @@ [% END %] - [% IF logtext %] -

    Log

    -
    [% HTML.escape(logtext) %]
    - [% END %] - [% IF isAggregate %] @@ -290,7 +299,7 @@ - + [% IF build.finished && !build.iscachedbuild %] @@ -377,6 +386,25 @@ [% END %]
    Queued:Queued at: [% INCLUDE renderDateTime timestamp = build.timestamp %]
    + + [% IF build.finished && build.buildmetrics %] +

    Metrics

    + + + + + + + [% FOREACH metric IN build.buildmetrics %] + + + + + [% END %] + +
    NameValue
    [%HTML.escape(metric.name)%][%metric.value%][%metric.unit%]
    + [% END %] +
    diff --git a/src/root/common.tt b/src/root/common.tt index 44902431..6acea0c6 100644 --- a/src/root/common.tt +++ b/src/root/common.tt @@ -198,12 +198,14 @@ BLOCK renderBuildStatusIcon; Failed [% ELSIF buildstatus == 2 || buildstatus == 5 %] Dependency failed - [% ELSIF buildstatus == 3 %] + [% ELSIF buildstatus == 3 || buildstatus == 9 %] Aborted [% ELSIF buildstatus == 4 %] Cancelled [% ELSIF buildstatus == 6 %] - Failed (with result) + Failed with output + [% ELSIF buildstatus == 7 %] + Timed out [% ELSE %] Failed [% END; @@ -228,7 +230,11 @@ BLOCK renderStatus; [% ELSIF buildstatus == 4 %] Cancelled by user [% ELSIF buildstatus == 6 %] - Build failed (with result) + Build failed with output + [% ELSIF buildstatus == 7 %] + Timed out + [% ELSIF buildstatus == 9 %] + Unsupported system type [% ELSE %] Aborted (Hydra failure; see below) @@ -566,12 +572,14 @@ BLOCK createChart %] success: function(data) { var ids = []; var d = []; - var max = 0; + var maxTime = 0; + var minTime = Number.MAX_SAFE_INTEGER; data.forEach(function(x) { var t = x.timestamp * 1000; ids[t] = x.id; d.push([t, x.value [% IF yaxis == "mib" %] / (1024.0 * 1024.0)[% END %]]); - max = Math.max(t, max); + maxTime = Math.max(t, maxTime); + minTime = Math.min(t, minTime); }); var options = { @@ -634,7 +642,7 @@ BLOCK createChart %] }); // Zoom in to the last two months by default. - plot.setSelection({ xaxis: { from: max - 60 * 24 * 60 * 60 * 1000, to: max } }); + plot.setSelection({ xaxis: { from: Math.max(minTime, maxTime - 60 * 24 * 60 * 60 * 1000), to: maxTime } }); } }); }); @@ -643,4 +651,9 @@ BLOCK createChart %] [% END; +BLOCK renderMachineName; +machine ? stripSSHUser(machine).match('^(([^\.]|\.[0-9])*)').0 : "localhost"; +END; + + %] diff --git a/src/root/job.tt b/src/root/job.tt index 998bc083..a9e1ca77 100644 --- a/src/root/job.tt +++ b/src/root/job.tt @@ -98,6 +98,14 @@ removed or had an evaluation error.
    [% INCLUDE createChart id="output-size" yaxis="mib" dataUrl=c.uri_for('/job' project.name jobset.name job.name 'output-sizes') %] + [% FOREACH metric IN metrics %] + +

    Metric: [%HTML.escape(metric.name)%]

    + + [% INCLUDE createChart id="metric-${metric.name}" dataUrl=c.uri_for('/job' project.name jobset.name job.name 'metric' metric.name) %] + + [% END %] +