#pragma once #include #include #include #include #include #include #include "db.hh" #include "counter.hh" #include "pathlocks.hh" #include "pool.hh" #include "sync.hh" #include "token-server.hh" #include "store-api.hh" #include "derivations.hh" using namespace nix; typedef unsigned int BuildID; typedef std::chrono::time_point system_time; typedef enum { bsSuccess = 0, bsFailed = 1, bsDepFailed = 2, bsAborted = 3, bsFailedWithOutput = 6, bsTimedOut = 7, bsUnsupported = 9, } BuildStatus; typedef enum { bssSuccess = 0, bssFailed = 1, bssAborted = 4, bssTimedOut = 7, bssUnsupported = 9, bssBusy = 100, // not stored } BuildStepStatus; struct RemoteResult { enum { rrSuccess = 0, rrPermanentFailure = 1, rrTimedOut = 2, rrMiscFailure = 3 } status = rrMiscFailure; std::string errorMsg; time_t startTime = 0, stopTime = 0; nix::Path logFile; }; struct Step; struct BuildResult; struct Build { typedef std::shared_ptr ptr; typedef std::weak_ptr wptr; BuildID id; Path drvPath; std::map outputs; std::string fullJobName; unsigned int maxSilentTime, buildTimeout; std::shared_ptr toplevel; std::atomic_bool finishedInDB{false}; }; struct Step { typedef std::shared_ptr ptr; typedef std::weak_ptr wptr; Path drvPath; Derivation drv; std::set requiredSystemFeatures; bool preferLocalBuild; struct State { /* Whether the step has finished initialisation. */ bool created = false; /* The build steps on which this step depends. */ std::set deps; /* The build steps that depend on this step. */ std::vector rdeps; /* Builds that have this step as the top-level derivation. */ std::vector builds; /* Number of times we've tried this step. */ unsigned int tries = 0; /* Point in time after which the step can be retried. */ system_time after; }; std::atomic_bool finished{false}; // debugging Sync state; ~Step() { //printMsg(lvlError, format("destroying step %1%") % drvPath); } }; struct Machine { typedef std::shared_ptr ptr; std::string sshName, sshKey; std::set systemTypes, supportedFeatures, mandatoryFeatures; unsigned int maxJobs = 1; float speedFactor = 1.0; struct State { typedef std::shared_ptr ptr; counter currentJobs{0}; counter nrStepsDone{0}; counter totalStepTime{0}; // total time for steps, including closure copying counter totalStepBuildTime{0}; // total build time for steps }; State::ptr state; bool supportsStep(Step::ptr step) { if (systemTypes.find(step->drv.platform) == systemTypes.end()) return false; for (auto & f : mandatoryFeatures) if (step->requiredSystemFeatures.find(f) == step->requiredSystemFeatures.end() && !(step->preferLocalBuild && f == "local")) return false; for (auto & f : step->requiredSystemFeatures) if (supportedFeatures.find(f) == supportedFeatures.end()) return false; return true; } }; class State { private: Path hydraData, logDir; StringSet localPlatforms; /* The queued builds. */ typedef std::map Builds; Sync builds; /* All active or pending build steps (i.e. dependencies of the queued builds). Note that these are weak pointers. Steps are kept alive by being reachable from Builds or by being in progress. */ typedef std::map Steps; Sync steps; /* Build steps that have no unbuilt dependencies. */ typedef std::list Runnable; Sync runnable; /* CV for waking up the dispatcher. */ std::condition_variable dispatcherWakeup; std::mutex dispatcherMutex; /* PostgreSQL connection pool. */ Pool dbPool; /* The build machines. */ typedef std::map Machines; Sync machines; // FIXME: use atomic_shared_ptr Path machinesFile; struct stat machinesFileStat; /* Token server limiting the number of threads copying closures in parallel to prevent excessive I/O load. */ TokenServer copyClosureTokenServer; /* Various stats. */ time_t startedAt; counter nrBuildsRead{0}; counter nrBuildsDone{0}; counter nrStepsDone{0}; counter nrActiveSteps{0}; counter nrStepsBuilding{0}; counter nrStepsCopyingTo{0}; counter nrStepsCopyingFrom{0}; counter nrRetries{0}; counter maxNrRetries{0}; counter totalStepTime{0}; // total time for steps, including closure copying counter totalStepBuildTime{0}; // total build time for steps counter nrQueueWakeups{0}; counter nrDispatcherWakeups{0}; counter bytesSent{0}; counter bytesReceived{0}; /* Log compressor work queue. */ Sync> logCompressorQueue; std::condition_variable_any logCompressorWakeup; /* Notification sender work queue. FIXME: if hydra-queue-runner is killed before it has finished sending notifications about a build, then the notifications may be lost. It would be better to mark builds with pending notification in the database. */ typedef std::pair> NotificationItem; Sync> notificationSenderQueue; std::condition_variable_any notificationSenderWakeup; /* Specific build to do for --build-one (testing only). */ BuildID buildOne; public: State(); private: void clearBusy(Connection & conn, time_t stopTime); /* (Re)load /etc/nix/machines. */ void loadMachinesFile(); /* Thread to reload /etc/nix/machines periodically. */ void monitorMachinesFile(); int createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step, const std::string & machine, BuildStepStatus status, const std::string & errorMsg = "", BuildID propagatedFrom = 0); void finishBuildStep(pqxx::work & txn, time_t startTime, time_t stopTime, BuildID buildId, int stepNr, const std::string & machine, BuildStepStatus status, const string & errorMsg = "", BuildID propagatedFrom = 0); void updateBuild(pqxx::work & txn, Build::ptr build, BuildStatus status); void queueMonitor(); void queueMonitorLoop(); void getQueuedBuilds(Connection & conn, std::shared_ptr store, unsigned int & lastBuildId); void removeCancelledBuilds(Connection & conn); Step::ptr createStep(std::shared_ptr store, const Path & drvPath, Build::ptr referringBuild, Step::ptr referringStep, std::set & finishedDrvs, std::set & newSteps, std::set & newRunnable); void makeRunnable(Step::ptr step); /* The thread that selects and starts runnable builds. */ void dispatcher(); void wakeDispatcher(); void builder(Step::ptr step, Machine::ptr machine, std::shared_ptr reservation); /* Perform the given build step. Return true if the step is to be retried. */ bool doBuildStep(std::shared_ptr store, Step::ptr step, Machine::ptr machine); void buildRemote(std::shared_ptr store, Machine::ptr machine, Step::ptr step, unsigned int maxSilentTime, unsigned int buildTimeout, RemoteResult & result); void markSucceededBuild(pqxx::work & txn, Build::ptr build, const BuildResult & res, bool isCachedBuild, time_t startTime, time_t stopTime); bool checkCachedFailure(Step::ptr step, Connection & conn); /* Thread that asynchronously bzips logs of finished steps. */ void logCompressor(); /* Thread that asynchronously invokes hydra-notify to send build notifications. */ void notificationSender(); /* Acquire the global queue runner lock, or null if somebody else has it. */ std::shared_ptr acquireGlobalLock(); void dumpStatus(Connection & conn, bool log); public: void showStatus(); void unlock(); void run(BuildID buildOne = 0); };