hydra/src/hydra-queue-runner/state.hh

459 lines
13 KiB
C++
Raw Normal View History

2015-07-07 08:17:21 +00:00
#pragma once
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <map>
#include <memory>
#include <queue>
#include "db.hh"
#include "counter.hh"
#include "pathlocks.hh"
#include "pool.hh"
#include "sync.hh"
#include "store-api.hh"
#include "derivations.hh"
2016-02-19 23:04:08 +00:00
#include "binary-cache-store.hh" // FIXME
2015-07-07 08:17:21 +00:00
typedef unsigned int BuildID;
typedef std::chrono::time_point<std::chrono::system_clock> system_time;
typedef enum {
bsSuccess = 0,
bsFailed = 1,
bsDepFailed = 2,
bsAborted = 3,
bsFailedWithOutput = 6,
bsTimedOut = 7,
bsUnsupported = 9,
bsLogLimitExceeded = 10,
2015-07-07 08:17:21 +00:00
} BuildStatus;
typedef enum {
bssSuccess = 0,
bssFailed = 1,
bssAborted = 4,
bssTimedOut = 7,
bssCachedFailure = 8,
2015-07-07 08:17:21 +00:00
bssUnsupported = 9,
bssLogLimitExceeded = 10,
2015-07-07 08:17:21 +00:00
bssBusy = 100, // not stored
} BuildStepStatus;
struct RemoteResult : nix::BuildResult
2015-07-07 08:25:33 +00:00
{
time_t startTime = 0, stopTime = 0;
unsigned int overhead = 0;
2015-07-07 08:25:33 +00:00
nix::Path logFile;
std::shared_ptr<nix::FSAccessor> accessor;
bool canRetry()
{
return status == TransientFailure || status == MiscFailure;
}
2015-07-07 08:25:33 +00:00
};
2015-07-07 08:17:21 +00:00
struct Step;
struct BuildOutput;
2015-07-07 08:17:21 +00:00
class Jobset
{
public:
typedef std::shared_ptr<Jobset> ptr;
typedef std::weak_ptr<Jobset> wptr;
static const time_t schedulingWindow = 24 * 60 * 60;
private:
std::atomic<time_t> seconds{0};
std::atomic<unsigned int> shares{1};
/* The start time and duration of the most recent build steps. */
2016-02-24 13:04:31 +00:00
nix::Sync<std::map<time_t, time_t>> steps;
public:
double shareUsed()
{
return (double) seconds / shares;
}
void setShares(int shares_)
{
assert(shares_ > 0);
shares = shares_;
}
time_t getSeconds() { return seconds; }
void addStep(time_t startTime, time_t duration);
void pruneSteps();
};
2015-07-07 08:17:21 +00:00
struct Build
{
typedef std::shared_ptr<Build> ptr;
typedef std::weak_ptr<Build> wptr;
BuildID id;
2015-07-07 08:29:43 +00:00
nix::Path drvPath;
std::map<std::string, nix::Path> outputs;
std::string projectName, jobsetName, jobName;
time_t timestamp;
2015-07-07 08:17:21 +00:00
unsigned int maxSilentTime, buildTimeout;
int localPriority, globalPriority;
2015-07-07 08:17:21 +00:00
std::shared_ptr<Step> toplevel;
Jobset::ptr jobset;
2015-07-07 08:17:21 +00:00
std::atomic_bool finishedInDB{false};
std::string fullJobName()
{
return projectName + ":" + jobsetName + ":" + jobName;
}
void propagatePriorities();
2015-07-07 08:17:21 +00:00
};
struct Step
{
typedef std::shared_ptr<Step> ptr;
typedef std::weak_ptr<Step> wptr;
2015-07-07 08:29:43 +00:00
nix::Path drvPath;
nix::Derivation drv;
2015-07-07 08:17:21 +00:00
std::set<std::string> requiredSystemFeatures;
bool preferLocalBuild;
std::string systemType; // concatenation of drv.platform and requiredSystemFeatures
2015-07-07 08:17:21 +00:00
struct State
{
/* Whether the step has finished initialisation. */
bool created = false;
/* The build steps on which this step depends. */
std::set<Step::ptr> deps;
/* The build steps that depend on this step. */
std::vector<Step::wptr> rdeps;
/* Builds that have this step as the top-level derivation. */
std::vector<Build::wptr> builds;
/* Jobsets to which this step belongs. Used for determining
scheduling priority. */
std::set<Jobset::ptr> jobsets;
2015-07-07 08:17:21 +00:00
/* Number of times we've tried this step. */
unsigned int tries = 0;
/* Point in time after which the step can be retried. */
system_time after;
/* The highest global priority of any build depending on this
step. */
int highestGlobalPriority{0};
/* The highest local priority of any build depending on this
step. */
int highestLocalPriority{0};
/* The lowest ID of any build depending on this step. */
BuildID lowestBuildID{std::numeric_limits<BuildID>::max()};
/* The time at which this step became runnable. */
system_time runnableSince;
2015-07-07 08:17:21 +00:00
};
std::atomic_bool finished{false}; // debugging
2016-02-24 13:04:31 +00:00
nix::Sync<State> state;
2015-07-07 08:17:21 +00:00
~Step()
{
//printMsg(lvlError, format("destroying step %1%") % drvPath);
}
};
2015-07-21 13:14:17 +00:00
void getDependents(Step::ptr step, std::set<Build::ptr> & builds, std::set<Step::ptr> & steps);
/* Call visitor for a step and all its dependencies. */
void visitDependencies(std::function<void(Step::ptr)> visitor, Step::ptr step);
2015-07-21 13:14:17 +00:00
2015-07-07 08:17:21 +00:00
struct Machine
{
typedef std::shared_ptr<Machine> ptr;
bool enabled{true};
2015-07-07 08:17:21 +00:00
std::string sshName, sshKey;
std::set<std::string> systemTypes, supportedFeatures, mandatoryFeatures;
unsigned int maxJobs = 1;
float speedFactor = 1.0;
std::string sshPublicHostKey;
2015-07-07 08:17:21 +00:00
struct State {
typedef std::shared_ptr<State> ptr;
counter currentJobs{0};
counter nrStepsDone{0};
counter totalStepTime{0}; // total time for steps, including closure copying
counter totalStepBuildTime{0}; // total build time for steps
std::atomic<time_t> idleSince{0};
struct ConnectInfo
{
system_time lastFailure, disabledUntil;
unsigned int consecutiveFailures;
};
2016-02-24 13:04:31 +00:00
nix::Sync<ConnectInfo> connectInfo;
/* Mutex to prevent multiple threads from sending data to the
same machine (which would be inefficient). */
std::mutex sendLock;
2015-07-07 08:17:21 +00:00
};
State::ptr state;
bool supportsStep(Step::ptr step)
{
if (systemTypes.find(step->drv.platform) == systemTypes.end()) return false;
for (auto & f : mandatoryFeatures)
if (step->requiredSystemFeatures.find(f) == step->requiredSystemFeatures.end()
&& !(step->preferLocalBuild && f == "local"))
return false;
for (auto & f : step->requiredSystemFeatures)
if (supportedFeatures.find(f) == supportedFeatures.end()) return false;
return true;
}
};
class State
{
private:
2015-07-21 13:14:17 +00:00
// FIXME: Make configurable.
const unsigned int maxTries = 5;
const unsigned int retryInterval = 60; // seconds
const float retryBackoff = 3.0;
const unsigned int maxParallelCopyClosure = 4;
2015-07-07 08:29:43 +00:00
nix::Path hydraData, logDir;
2015-07-07 08:17:21 +00:00
std::map<std::string, std::string> hydraConfig;
2015-07-07 08:17:21 +00:00
/* The queued builds. */
typedef std::map<BuildID, Build::ptr> Builds;
2016-02-24 13:04:31 +00:00
nix::Sync<Builds> builds;
2015-07-07 08:17:21 +00:00
/* The jobsets. */
typedef std::map<std::pair<std::string, std::string>, Jobset::ptr> Jobsets;
2016-02-24 13:04:31 +00:00
nix::Sync<Jobsets> jobsets;
2015-07-07 08:17:21 +00:00
/* All active or pending build steps (i.e. dependencies of the
queued builds). Note that these are weak pointers. Steps are
kept alive by being reachable from Builds or by being in
progress. */
2015-07-07 08:29:43 +00:00
typedef std::map<nix::Path, Step::wptr> Steps;
2016-02-24 13:04:31 +00:00
nix::Sync<Steps> steps;
2015-07-07 08:17:21 +00:00
/* Build steps that have no unbuilt dependencies. */
typedef std::list<Step::wptr> Runnable;
2016-02-24 13:04:31 +00:00
nix::Sync<Runnable> runnable;
2015-07-07 08:17:21 +00:00
/* CV for waking up the dispatcher. */
2016-02-24 13:04:31 +00:00
nix::Sync<bool> dispatcherWakeup;
std::condition_variable dispatcherWakeupCV;
2015-07-07 08:17:21 +00:00
/* PostgreSQL connection pool. */
2016-02-24 13:04:31 +00:00
nix::Pool<Connection> dbPool;
2015-07-07 08:17:21 +00:00
/* The build machines. */
2015-07-07 08:29:43 +00:00
typedef std::map<std::string, Machine::ptr> Machines;
2016-02-24 13:04:31 +00:00
nix::Sync<Machines> machines; // FIXME: use atomic_shared_ptr
2015-07-07 08:17:21 +00:00
/* Various stats. */
time_t startedAt;
counter nrBuildsRead{0};
counter nrBuildsDone{0};
counter nrStepsDone{0};
counter nrActiveSteps{0};
counter nrStepsBuilding{0};
counter nrStepsCopyingTo{0};
counter nrStepsCopyingFrom{0};
counter nrStepsWaiting{0};
2015-07-07 08:17:21 +00:00
counter nrRetries{0};
counter maxNrRetries{0};
counter totalStepTime{0}; // total time for steps, including closure copying
counter totalStepBuildTime{0}; // total build time for steps
counter nrQueueWakeups{0};
counter nrDispatcherWakeups{0};
counter bytesSent{0};
counter bytesReceived{0};
2016-02-29 14:10:30 +00:00
counter nrActiveDbUpdates{0};
2015-07-07 08:17:21 +00:00
/* Log compressor work queue. */
2016-02-24 13:04:31 +00:00
nix::Sync<std::queue<nix::Path>> logCompressorQueue;
std::condition_variable logCompressorWakeup;
2015-07-07 08:17:21 +00:00
/* Notification sender work queue. FIXME: if hydra-queue-runner is
killed before it has finished sending notifications about a
build, then the notifications may be lost. It would be better
to mark builds with pending notification in the database. */
typedef std::pair<BuildID, std::vector<BuildID>> NotificationItem;
2016-02-24 13:04:31 +00:00
nix::Sync<std::queue<NotificationItem>> notificationSenderQueue;
std::condition_variable notificationSenderWakeup;
2015-07-07 08:17:21 +00:00
/* Specific build to do for --build-one (testing only). */
BuildID buildOne;
/* Statistics per machine type for the Hydra auto-scaler. */
struct MachineType
{
unsigned int runnable{0}, running{0};
system_time lastActive;
std::chrono::seconds waitTime; // time runnable steps have been waiting
};
2016-02-24 13:04:31 +00:00
nix::Sync<std::map<std::string, MachineType>> machineTypes;
struct MachineReservation
{
typedef std::shared_ptr<MachineReservation> ptr;
State & state;
Step::ptr step;
Machine::ptr machine;
MachineReservation(State & state, Step::ptr step, Machine::ptr machine);
~MachineReservation();
};
std::atomic<time_t> lastDispatcherCheck{0};
2016-02-24 13:04:31 +00:00
std::shared_ptr<nix::Store> _localStore;
std::shared_ptr<nix::Store> _destStore;
2015-07-07 08:17:21 +00:00
public:
State();
private:
2016-02-29 14:10:30 +00:00
MaintainCount startDbUpdate();
/* Return a store object that can access derivations produced by
hydra-evaluator. */
2016-02-24 13:04:31 +00:00
nix::ref<nix::Store> getLocalStore();
/* Return a store object to store build results. */
nix::ref<nix::Store> getDestStore();
2015-07-07 08:17:21 +00:00
void clearBusy(Connection & conn, time_t stopTime);
void parseMachines(const std::string & contents);
2015-07-07 08:17:21 +00:00
/* Thread to reload /etc/nix/machines periodically. */
void monitorMachinesFile();
int allocBuildStep(pqxx::work & txn, Build::ptr build);
2015-07-07 08:17:21 +00:00
int createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step,
const std::string & machine, BuildStepStatus status, const std::string & errorMsg = "",
BuildID propagatedFrom = 0);
void finishBuildStep(pqxx::work & txn, time_t startTime, time_t stopTime,
unsigned int overhead, BuildID buildId, int stepNr,
2015-07-07 08:29:43 +00:00
const std::string & machine, BuildStepStatus status, const std::string & errorMsg = "",
2015-07-07 08:17:21 +00:00
BuildID propagatedFrom = 0);
int createSubstitutionStep(pqxx::work & txn, time_t startTime, time_t stopTime,
Build::ptr build, const nix::Path & drvPath, const std::string & outputName, const nix::Path & storePath);
2015-07-07 08:17:21 +00:00
void updateBuild(pqxx::work & txn, Build::ptr build, BuildStatus status);
void queueMonitor();
void queueMonitorLoop();
/* Check the queue for new builds. */
bool getQueuedBuilds(Connection & conn, nix::ref<nix::Store> localStore,
nix::ref<nix::Store> destStore, unsigned int & lastBuildId);
2015-07-07 08:17:21 +00:00
/* Handle cancellation, deletion and priority bumps. */
void processQueueChange(Connection & conn);
2015-07-07 08:17:21 +00:00
2016-02-11 14:59:47 +00:00
Step::ptr createStep(nix::ref<nix::Store> store,
Connection & conn, Build::ptr build, const nix::Path & drvPath,
2015-07-07 08:29:43 +00:00
Build::ptr referringBuild, Step::ptr referringStep, std::set<nix::Path> & finishedDrvs,
2015-07-07 08:17:21 +00:00
std::set<Step::ptr> & newSteps, std::set<Step::ptr> & newRunnable);
Jobset::ptr createJobset(pqxx::work & txn,
const std::string & projectName, const std::string & jobsetName);
void processJobsetSharesChange(Connection & conn);
2015-07-07 08:17:21 +00:00
void makeRunnable(Step::ptr step);
/* The thread that selects and starts runnable builds. */
void dispatcher();
2015-08-10 09:26:30 +00:00
system_time doDispatch();
2015-07-07 08:17:21 +00:00
void wakeDispatcher();
void builder(MachineReservation::ptr reservation);
2015-07-07 08:17:21 +00:00
/* Perform the given build step. Return true if the step is to be
retried. */
bool doBuildStep(nix::ref<nix::Store> destStore, Step::ptr step,
2015-07-07 08:17:21 +00:00
Machine::ptr machine);
void buildRemote(nix::ref<nix::Store> destStore,
2015-07-07 08:25:33 +00:00
Machine::ptr machine, Step::ptr step,
unsigned int maxSilentTime, unsigned int buildTimeout,
RemoteResult & result);
2015-07-07 08:17:21 +00:00
void markSucceededBuild(pqxx::work & txn, Build::ptr build,
const BuildOutput & res, bool isCachedBuild, time_t startTime, time_t stopTime);
2015-07-07 08:17:21 +00:00
bool checkCachedFailure(Step::ptr step, Connection & conn);
/* Thread that asynchronously bzips logs of finished steps. */
void logCompressor();
/* Thread that asynchronously invokes hydra-notify to send build
notifications. */
void notificationSender();
/* Acquire the global queue runner lock, or null if somebody else
has it. */
2015-07-07 08:29:43 +00:00
std::shared_ptr<nix::PathLocks> acquireGlobalLock();
2015-07-07 08:17:21 +00:00
void dumpStatus(Connection & conn, bool log);
public:
void showStatus();
void unlock();
void run(BuildID buildOne = 0);
};