From df29527531754ff269feee3f4d654ca6bdd9d6b6 Mon Sep 17 00:00:00 2001 From: Eelco Dolstra Date: Tue, 7 Jul 2015 10:17:21 +0200 Subject: [PATCH] Refactor --- src/hydra-queue-runner/Makefile.am | 2 +- src/hydra-queue-runner/db.hh | 39 +++ src/hydra-queue-runner/hydra-queue-runner.cc | 327 +------------------ src/hydra-queue-runner/state.hh | 293 +++++++++++++++++ 4 files changed, 335 insertions(+), 326 deletions(-) create mode 100644 src/hydra-queue-runner/db.hh create mode 100644 src/hydra-queue-runner/state.hh diff --git a/src/hydra-queue-runner/Makefile.am b/src/hydra-queue-runner/Makefile.am index 699a22a5..5acc7729 100644 --- a/src/hydra-queue-runner/Makefile.am +++ b/src/hydra-queue-runner/Makefile.am @@ -1,7 +1,7 @@ bin_PROGRAMS = hydra-queue-runner hydra_queue_runner_SOURCES = hydra-queue-runner.cc build-result.cc build-remote.cc \ - build-remote.hh build-result.hh counter.hh pool.hh sync.hh token-server.hh + build-remote.hh build-result.hh counter.hh pool.hh sync.hh token-server.hh state.hh db.hh hydra_queue_runner_LDADD = $(NIX_LIBS) -lpqxx AM_CXXFLAGS = $(NIX_CFLAGS) -Wall diff --git a/src/hydra-queue-runner/db.hh b/src/hydra-queue-runner/db.hh new file mode 100644 index 00000000..9242fd75 --- /dev/null +++ b/src/hydra-queue-runner/db.hh @@ -0,0 +1,39 @@ +#pragma once + +#include + +#include "util.hh" + +using namespace nix; + + +struct Connection : pqxx::connection +{ + Connection() : pqxx::connection(getFlags()) { }; + + string getFlags() + { + string s = getEnv("HYDRA_DBI", "dbi:Pg:dbname=hydra;"); + string prefix = "dbi:Pg:"; + if (string(s, 0, prefix.size()) != prefix) + throw Error("$HYDRA_DBI does not denote a PostgreSQL database"); + return concatStringsSep(" ", tokenizeString(string(s, prefix.size()), ";")); + } +}; + + +struct receiver : public pqxx::notification_receiver +{ + bool status = false; + receiver(pqxx::connection_base & c, const std::string & channel) + : pqxx::notification_receiver(c, channel) { } + void operator() (const string & payload, int pid) override + { + status = true; + }; + bool get() { + bool b = status; + status = false; + return b; + } +}; diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index 4d9f59a6..b1f95b6e 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -1,33 +1,19 @@ -#include -#include #include -#include -#include -#include #include #include -#include #include -#include - #include #include #include #include "build-result.hh" #include "build-remote.hh" -#include "sync.hh" -#include "pool.hh" -#include "counter.hh" -#include "token-server.hh" +#include "state.hh" -#include "store-api.hh" -#include "derivations.hh" #include "shared.hh" #include "globals.hh" #include "value-to-json.hh" -#include "pathlocks.hh" using namespace nix; @@ -39,9 +25,6 @@ const float retryBackoff = 3.0; const unsigned int maxParallelCopyClosure = 4; -typedef std::chrono::time_point system_time; - - template bool has(const C & c, const V & v) { @@ -49,314 +32,8 @@ bool has(const C & c, const V & v) } -typedef enum { - bsSuccess = 0, - bsFailed = 1, - bsDepFailed = 2, - bsAborted = 3, - bsFailedWithOutput = 6, - bsTimedOut = 7, - bsUnsupported = 9, -} BuildStatus; - - -typedef enum { - bssSuccess = 0, - bssFailed = 1, - bssAborted = 4, - bssTimedOut = 7, - bssUnsupported = 9, - bssBusy = 100, // not stored -} BuildStepStatus; - - -struct Connection : pqxx::connection -{ - Connection() : pqxx::connection(getFlags()) { }; - - string getFlags() - { - string s = getEnv("HYDRA_DBI", "dbi:Pg:dbname=hydra;"); - string prefix = "dbi:Pg:"; - if (string(s, 0, prefix.size()) != prefix) - throw Error("$HYDRA_DBI does not denote a PostgreSQL database"); - return concatStringsSep(" ", tokenizeString(string(s, prefix.size()), ";")); - } -}; - - -struct receiver : public pqxx::notification_receiver -{ - bool status = false; - receiver(pqxx::connection_base & c, const std::string & channel) - : pqxx::notification_receiver(c, channel) { } - void operator() (const string & payload, int pid) override - { - status = true; - }; - bool get() { - bool b = status; - status = false; - return b; - } -}; - - -typedef unsigned int BuildID; - - -struct Step; - - -struct Build -{ - typedef std::shared_ptr ptr; - typedef std::weak_ptr wptr; - - BuildID id; - Path drvPath; - std::map outputs; - std::string fullJobName; - unsigned int maxSilentTime, buildTimeout; - - std::shared_ptr toplevel; - - std::atomic_bool finishedInDB{false}; - - ~Build() - { - printMsg(lvlDebug, format("destroying build %1%") % id); - } -}; - - -struct Step -{ - typedef std::shared_ptr ptr; - typedef std::weak_ptr wptr; - - Path drvPath; - Derivation drv; - std::set requiredSystemFeatures; - bool preferLocalBuild; - - struct State - { - /* Whether the step has finished initialisation. */ - bool created = false; - - /* The build steps on which this step depends. */ - std::set deps; - - /* The build steps that depend on this step. */ - std::vector rdeps; - - /* Builds that have this step as the top-level derivation. */ - std::vector builds; - - /* Number of times we've tried this step. */ - unsigned int tries = 0; - - /* Point in time after which the step can be retried. */ - system_time after; - }; - - std::atomic_bool finished{false}; // debugging - - Sync state; - - ~Step() - { - //printMsg(lvlError, format("destroying step %1%") % drvPath); - } -}; - - -struct Machine -{ - typedef std::shared_ptr ptr; - - std::string sshName, sshKey; - std::set systemTypes, supportedFeatures, mandatoryFeatures; - unsigned int maxJobs = 1; - float speedFactor = 1.0; - - struct State { - typedef std::shared_ptr ptr; - counter currentJobs{0}; - counter nrStepsDone{0}; - counter totalStepTime{0}; // total time for steps, including closure copying - counter totalStepBuildTime{0}; // total build time for steps - }; - - State::ptr state; - - bool supportsStep(Step::ptr step) - { - if (systemTypes.find(step->drv.platform) == systemTypes.end()) return false; - for (auto & f : mandatoryFeatures) - if (step->requiredSystemFeatures.find(f) == step->requiredSystemFeatures.end() - && !(step->preferLocalBuild && f == "local")) - return false; - for (auto & f : step->requiredSystemFeatures) - if (supportedFeatures.find(f) == supportedFeatures.end()) return false; - return true; - } -}; - - -class State -{ -private: - - Path hydraData, logDir; - - StringSet localPlatforms; - - /* The queued builds. */ - typedef std::map Builds; - Sync builds; - - /* All active or pending build steps (i.e. dependencies of the - queued builds). Note that these are weak pointers. Steps are - kept alive by being reachable from Builds or by being in - progress. */ - typedef std::map Steps; - Sync steps; - - /* Build steps that have no unbuilt dependencies. */ - typedef std::list Runnable; - Sync runnable; - - /* CV for waking up the dispatcher. */ - std::condition_variable dispatcherWakeup; - std::mutex dispatcherMutex; - - /* PostgreSQL connection pool. */ - Pool dbPool; - - /* The build machines. */ - typedef std::map Machines; - Sync machines; // FIXME: use atomic_shared_ptr - - Path machinesFile; - struct stat machinesFileStat; - - /* Token server limiting the number of threads copying closures in - parallel to prevent excessive I/O load. */ - TokenServer copyClosureTokenServer{maxParallelCopyClosure}; - - /* Various stats. */ - time_t startedAt; - counter nrBuildsRead{0}; - counter nrBuildsDone{0}; - counter nrStepsDone{0}; - counter nrActiveSteps{0}; - counter nrStepsBuilding{0}; - counter nrStepsCopyingTo{0}; - counter nrStepsCopyingFrom{0}; - counter nrRetries{0}; - counter maxNrRetries{0}; - counter totalStepTime{0}; // total time for steps, including closure copying - counter totalStepBuildTime{0}; // total build time for steps - counter nrQueueWakeups{0}; - counter nrDispatcherWakeups{0}; - counter bytesSent{0}; - counter bytesReceived{0}; - - /* Log compressor work queue. */ - Sync> logCompressorQueue; - std::condition_variable_any logCompressorWakeup; - - /* Notification sender work queue. FIXME: if hydra-queue-runner is - killed before it has finished sending notifications about a - build, then the notifications may be lost. It would be better - to mark builds with pending notification in the database. */ - typedef std::pair> NotificationItem; - Sync> notificationSenderQueue; - std::condition_variable_any notificationSenderWakeup; - - /* Specific build to do for --build-one (testing only). */ - BuildID buildOne; - -public: - State(); - -private: - - void clearBusy(Connection & conn, time_t stopTime); - - /* (Re)load /etc/nix/machines. */ - void loadMachinesFile(); - - /* Thread to reload /etc/nix/machines periodically. */ - void monitorMachinesFile(); - - int createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step, - const std::string & machine, BuildStepStatus status, const std::string & errorMsg = "", - BuildID propagatedFrom = 0); - - void finishBuildStep(pqxx::work & txn, time_t startTime, time_t stopTime, BuildID buildId, int stepNr, - const std::string & machine, BuildStepStatus status, const string & errorMsg = "", - BuildID propagatedFrom = 0); - - void updateBuild(pqxx::work & txn, Build::ptr build, BuildStatus status); - - void queueMonitor(); - - void queueMonitorLoop(); - - void getQueuedBuilds(Connection & conn, std::shared_ptr store, unsigned int & lastBuildId); - - void removeCancelledBuilds(Connection & conn); - - Step::ptr createStep(std::shared_ptr store, const Path & drvPath, - Build::ptr referringBuild, Step::ptr referringStep, std::set & finishedDrvs, - std::set & newSteps, std::set & newRunnable); - - void makeRunnable(Step::ptr step); - - /* The thread that selects and starts runnable builds. */ - void dispatcher(); - - void wakeDispatcher(); - - void builder(Step::ptr step, Machine::ptr machine, std::shared_ptr reservation); - - /* Perform the given build step. Return true if the step is to be - retried. */ - bool doBuildStep(std::shared_ptr store, Step::ptr step, - Machine::ptr machine); - - void markSucceededBuild(pqxx::work & txn, Build::ptr build, - const BuildResult & res, bool isCachedBuild, time_t startTime, time_t stopTime); - - bool checkCachedFailure(Step::ptr step, Connection & conn); - - /* Thread that asynchronously bzips logs of finished steps. */ - void logCompressor(); - - /* Thread that asynchronously invokes hydra-notify to send build - notifications. */ - void notificationSender(); - - /* Acquire the global queue runner lock, or null if somebody else - has it. */ - std::shared_ptr acquireGlobalLock(); - - void dumpStatus(Connection & conn, bool log); - -public: - - void showStatus(); - - void unlock(); - - void run(BuildID buildOne = 0); -}; - - State::State() + : copyClosureTokenServer{maxParallelCopyClosure} { hydraData = getEnv("HYDRA_DATA"); if (hydraData == "") throw Error("$HYDRA_DATA must be set"); diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh new file mode 100644 index 00000000..ede7ecc6 --- /dev/null +++ b/src/hydra-queue-runner/state.hh @@ -0,0 +1,293 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "db.hh" +#include "counter.hh" +#include "pathlocks.hh" +#include "pool.hh" +#include "sync.hh" +#include "token-server.hh" + +#include "store-api.hh" +#include "derivations.hh" + +using namespace nix; + + +typedef unsigned int BuildID; + +typedef std::chrono::time_point system_time; + + +typedef enum { + bsSuccess = 0, + bsFailed = 1, + bsDepFailed = 2, + bsAborted = 3, + bsFailedWithOutput = 6, + bsTimedOut = 7, + bsUnsupported = 9, +} BuildStatus; + + +typedef enum { + bssSuccess = 0, + bssFailed = 1, + bssAborted = 4, + bssTimedOut = 7, + bssUnsupported = 9, + bssBusy = 100, // not stored +} BuildStepStatus; + + +struct Step; +struct BuildResult; + + +struct Build +{ + typedef std::shared_ptr ptr; + typedef std::weak_ptr wptr; + + BuildID id; + Path drvPath; + std::map outputs; + std::string fullJobName; + unsigned int maxSilentTime, buildTimeout; + + std::shared_ptr toplevel; + + std::atomic_bool finishedInDB{false}; +}; + + +struct Step +{ + typedef std::shared_ptr ptr; + typedef std::weak_ptr wptr; + + Path drvPath; + Derivation drv; + std::set requiredSystemFeatures; + bool preferLocalBuild; + + struct State + { + /* Whether the step has finished initialisation. */ + bool created = false; + + /* The build steps on which this step depends. */ + std::set deps; + + /* The build steps that depend on this step. */ + std::vector rdeps; + + /* Builds that have this step as the top-level derivation. */ + std::vector builds; + + /* Number of times we've tried this step. */ + unsigned int tries = 0; + + /* Point in time after which the step can be retried. */ + system_time after; + }; + + std::atomic_bool finished{false}; // debugging + + Sync state; + + ~Step() + { + //printMsg(lvlError, format("destroying step %1%") % drvPath); + } +}; + + +struct Machine +{ + typedef std::shared_ptr ptr; + + std::string sshName, sshKey; + std::set systemTypes, supportedFeatures, mandatoryFeatures; + unsigned int maxJobs = 1; + float speedFactor = 1.0; + + struct State { + typedef std::shared_ptr ptr; + counter currentJobs{0}; + counter nrStepsDone{0}; + counter totalStepTime{0}; // total time for steps, including closure copying + counter totalStepBuildTime{0}; // total build time for steps + }; + + State::ptr state; + + bool supportsStep(Step::ptr step) + { + if (systemTypes.find(step->drv.platform) == systemTypes.end()) return false; + for (auto & f : mandatoryFeatures) + if (step->requiredSystemFeatures.find(f) == step->requiredSystemFeatures.end() + && !(step->preferLocalBuild && f == "local")) + return false; + for (auto & f : step->requiredSystemFeatures) + if (supportedFeatures.find(f) == supportedFeatures.end()) return false; + return true; + } +}; + + +class State +{ +private: + + Path hydraData, logDir; + + StringSet localPlatforms; + + /* The queued builds. */ + typedef std::map Builds; + Sync builds; + + /* All active or pending build steps (i.e. dependencies of the + queued builds). Note that these are weak pointers. Steps are + kept alive by being reachable from Builds or by being in + progress. */ + typedef std::map Steps; + Sync steps; + + /* Build steps that have no unbuilt dependencies. */ + typedef std::list Runnable; + Sync runnable; + + /* CV for waking up the dispatcher. */ + std::condition_variable dispatcherWakeup; + std::mutex dispatcherMutex; + + /* PostgreSQL connection pool. */ + Pool dbPool; + + /* The build machines. */ + typedef std::map Machines; + Sync machines; // FIXME: use atomic_shared_ptr + + Path machinesFile; + struct stat machinesFileStat; + + /* Token server limiting the number of threads copying closures in + parallel to prevent excessive I/O load. */ + TokenServer copyClosureTokenServer; + + /* Various stats. */ + time_t startedAt; + counter nrBuildsRead{0}; + counter nrBuildsDone{0}; + counter nrStepsDone{0}; + counter nrActiveSteps{0}; + counter nrStepsBuilding{0}; + counter nrStepsCopyingTo{0}; + counter nrStepsCopyingFrom{0}; + counter nrRetries{0}; + counter maxNrRetries{0}; + counter totalStepTime{0}; // total time for steps, including closure copying + counter totalStepBuildTime{0}; // total build time for steps + counter nrQueueWakeups{0}; + counter nrDispatcherWakeups{0}; + counter bytesSent{0}; + counter bytesReceived{0}; + + /* Log compressor work queue. */ + Sync> logCompressorQueue; + std::condition_variable_any logCompressorWakeup; + + /* Notification sender work queue. FIXME: if hydra-queue-runner is + killed before it has finished sending notifications about a + build, then the notifications may be lost. It would be better + to mark builds with pending notification in the database. */ + typedef std::pair> NotificationItem; + Sync> notificationSenderQueue; + std::condition_variable_any notificationSenderWakeup; + + /* Specific build to do for --build-one (testing only). */ + BuildID buildOne; + +public: + State(); + +private: + + void clearBusy(Connection & conn, time_t stopTime); + + /* (Re)load /etc/nix/machines. */ + void loadMachinesFile(); + + /* Thread to reload /etc/nix/machines periodically. */ + void monitorMachinesFile(); + + int createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step, + const std::string & machine, BuildStepStatus status, const std::string & errorMsg = "", + BuildID propagatedFrom = 0); + + void finishBuildStep(pqxx::work & txn, time_t startTime, time_t stopTime, BuildID buildId, int stepNr, + const std::string & machine, BuildStepStatus status, const string & errorMsg = "", + BuildID propagatedFrom = 0); + + void updateBuild(pqxx::work & txn, Build::ptr build, BuildStatus status); + + void queueMonitor(); + + void queueMonitorLoop(); + + void getQueuedBuilds(Connection & conn, std::shared_ptr store, unsigned int & lastBuildId); + + void removeCancelledBuilds(Connection & conn); + + Step::ptr createStep(std::shared_ptr store, const Path & drvPath, + Build::ptr referringBuild, Step::ptr referringStep, std::set & finishedDrvs, + std::set & newSteps, std::set & newRunnable); + + void makeRunnable(Step::ptr step); + + /* The thread that selects and starts runnable builds. */ + void dispatcher(); + + void wakeDispatcher(); + + void builder(Step::ptr step, Machine::ptr machine, std::shared_ptr reservation); + + /* Perform the given build step. Return true if the step is to be + retried. */ + bool doBuildStep(std::shared_ptr store, Step::ptr step, + Machine::ptr machine); + + void markSucceededBuild(pqxx::work & txn, Build::ptr build, + const BuildResult & res, bool isCachedBuild, time_t startTime, time_t stopTime); + + bool checkCachedFailure(Step::ptr step, Connection & conn); + + /* Thread that asynchronously bzips logs of finished steps. */ + void logCompressor(); + + /* Thread that asynchronously invokes hydra-notify to send build + notifications. */ + void notificationSender(); + + /* Acquire the global queue runner lock, or null if somebody else + has it. */ + std::shared_ptr acquireGlobalLock(); + + void dumpStatus(Connection & conn, bool log); + +public: + + void showStatus(); + + void unlock(); + + void run(BuildID buildOne = 0); +};