This commit is contained in:
Eelco Dolstra 2015-07-07 10:17:21 +02:00
parent dd4f6e695e
commit df29527531
4 changed files with 335 additions and 326 deletions

View file

@ -1,7 +1,7 @@
bin_PROGRAMS = hydra-queue-runner
hydra_queue_runner_SOURCES = hydra-queue-runner.cc build-result.cc build-remote.cc \
build-remote.hh build-result.hh counter.hh pool.hh sync.hh token-server.hh
build-remote.hh build-result.hh counter.hh pool.hh sync.hh token-server.hh state.hh db.hh
hydra_queue_runner_LDADD = $(NIX_LIBS) -lpqxx
AM_CXXFLAGS = $(NIX_CFLAGS) -Wall

View file

@ -0,0 +1,39 @@
#pragma once
#include <pqxx/pqxx>
#include "util.hh"
using namespace nix;
struct Connection : pqxx::connection
{
Connection() : pqxx::connection(getFlags()) { };
string getFlags()
{
string s = getEnv("HYDRA_DBI", "dbi:Pg:dbname=hydra;");
string prefix = "dbi:Pg:";
if (string(s, 0, prefix.size()) != prefix)
throw Error("$HYDRA_DBI does not denote a PostgreSQL database");
return concatStringsSep(" ", tokenizeString<Strings>(string(s, prefix.size()), ";"));
}
};
struct receiver : public pqxx::notification_receiver
{
bool status = false;
receiver(pqxx::connection_base & c, const std::string & channel)
: pqxx::notification_receiver(c, channel) { }
void operator() (const string & payload, int pid) override
{
status = true;
};
bool get() {
bool b = status;
status = false;
return b;
}
};

View file

@ -1,33 +1,19 @@
#include <atomic>
#include <condition_variable>
#include <iostream>
#include <map>
#include <queue>
#include <memory>
#include <thread>
#include <cmath>
#include <chrono>
#include <algorithm>
#include <pqxx/pqxx>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "build-result.hh"
#include "build-remote.hh"
#include "sync.hh"
#include "pool.hh"
#include "counter.hh"
#include "token-server.hh"
#include "state.hh"
#include "store-api.hh"
#include "derivations.hh"
#include "shared.hh"
#include "globals.hh"
#include "value-to-json.hh"
#include "pathlocks.hh"
using namespace nix;
@ -39,9 +25,6 @@ const float retryBackoff = 3.0;
const unsigned int maxParallelCopyClosure = 4;
typedef std::chrono::time_point<std::chrono::system_clock> system_time;
template <class C, class V>
bool has(const C & c, const V & v)
{
@ -49,314 +32,8 @@ bool has(const C & c, const V & v)
}
typedef enum {
bsSuccess = 0,
bsFailed = 1,
bsDepFailed = 2,
bsAborted = 3,
bsFailedWithOutput = 6,
bsTimedOut = 7,
bsUnsupported = 9,
} BuildStatus;
typedef enum {
bssSuccess = 0,
bssFailed = 1,
bssAborted = 4,
bssTimedOut = 7,
bssUnsupported = 9,
bssBusy = 100, // not stored
} BuildStepStatus;
struct Connection : pqxx::connection
{
Connection() : pqxx::connection(getFlags()) { };
string getFlags()
{
string s = getEnv("HYDRA_DBI", "dbi:Pg:dbname=hydra;");
string prefix = "dbi:Pg:";
if (string(s, 0, prefix.size()) != prefix)
throw Error("$HYDRA_DBI does not denote a PostgreSQL database");
return concatStringsSep(" ", tokenizeString<Strings>(string(s, prefix.size()), ";"));
}
};
struct receiver : public pqxx::notification_receiver
{
bool status = false;
receiver(pqxx::connection_base & c, const std::string & channel)
: pqxx::notification_receiver(c, channel) { }
void operator() (const string & payload, int pid) override
{
status = true;
};
bool get() {
bool b = status;
status = false;
return b;
}
};
typedef unsigned int BuildID;
struct Step;
struct Build
{
typedef std::shared_ptr<Build> ptr;
typedef std::weak_ptr<Build> wptr;
BuildID id;
Path drvPath;
std::map<string, Path> outputs;
std::string fullJobName;
unsigned int maxSilentTime, buildTimeout;
std::shared_ptr<Step> toplevel;
std::atomic_bool finishedInDB{false};
~Build()
{
printMsg(lvlDebug, format("destroying build %1%") % id);
}
};
struct Step
{
typedef std::shared_ptr<Step> ptr;
typedef std::weak_ptr<Step> wptr;
Path drvPath;
Derivation drv;
std::set<std::string> requiredSystemFeatures;
bool preferLocalBuild;
struct State
{
/* Whether the step has finished initialisation. */
bool created = false;
/* The build steps on which this step depends. */
std::set<Step::ptr> deps;
/* The build steps that depend on this step. */
std::vector<Step::wptr> rdeps;
/* Builds that have this step as the top-level derivation. */
std::vector<Build::wptr> builds;
/* Number of times we've tried this step. */
unsigned int tries = 0;
/* Point in time after which the step can be retried. */
system_time after;
};
std::atomic_bool finished{false}; // debugging
Sync<State> state;
~Step()
{
//printMsg(lvlError, format("destroying step %1%") % drvPath);
}
};
struct Machine
{
typedef std::shared_ptr<Machine> ptr;
std::string sshName, sshKey;
std::set<std::string> systemTypes, supportedFeatures, mandatoryFeatures;
unsigned int maxJobs = 1;
float speedFactor = 1.0;
struct State {
typedef std::shared_ptr<State> ptr;
counter currentJobs{0};
counter nrStepsDone{0};
counter totalStepTime{0}; // total time for steps, including closure copying
counter totalStepBuildTime{0}; // total build time for steps
};
State::ptr state;
bool supportsStep(Step::ptr step)
{
if (systemTypes.find(step->drv.platform) == systemTypes.end()) return false;
for (auto & f : mandatoryFeatures)
if (step->requiredSystemFeatures.find(f) == step->requiredSystemFeatures.end()
&& !(step->preferLocalBuild && f == "local"))
return false;
for (auto & f : step->requiredSystemFeatures)
if (supportedFeatures.find(f) == supportedFeatures.end()) return false;
return true;
}
};
class State
{
private:
Path hydraData, logDir;
StringSet localPlatforms;
/* The queued builds. */
typedef std::map<BuildID, Build::ptr> Builds;
Sync<Builds> builds;
/* All active or pending build steps (i.e. dependencies of the
queued builds). Note that these are weak pointers. Steps are
kept alive by being reachable from Builds or by being in
progress. */
typedef std::map<Path, Step::wptr> Steps;
Sync<Steps> steps;
/* Build steps that have no unbuilt dependencies. */
typedef std::list<Step::wptr> Runnable;
Sync<Runnable> runnable;
/* CV for waking up the dispatcher. */
std::condition_variable dispatcherWakeup;
std::mutex dispatcherMutex;
/* PostgreSQL connection pool. */
Pool<Connection> dbPool;
/* The build machines. */
typedef std::map<string, Machine::ptr> Machines;
Sync<Machines> machines; // FIXME: use atomic_shared_ptr
Path machinesFile;
struct stat machinesFileStat;
/* Token server limiting the number of threads copying closures in
parallel to prevent excessive I/O load. */
TokenServer copyClosureTokenServer{maxParallelCopyClosure};
/* Various stats. */
time_t startedAt;
counter nrBuildsRead{0};
counter nrBuildsDone{0};
counter nrStepsDone{0};
counter nrActiveSteps{0};
counter nrStepsBuilding{0};
counter nrStepsCopyingTo{0};
counter nrStepsCopyingFrom{0};
counter nrRetries{0};
counter maxNrRetries{0};
counter totalStepTime{0}; // total time for steps, including closure copying
counter totalStepBuildTime{0}; // total build time for steps
counter nrQueueWakeups{0};
counter nrDispatcherWakeups{0};
counter bytesSent{0};
counter bytesReceived{0};
/* Log compressor work queue. */
Sync<std::queue<Path>> logCompressorQueue;
std::condition_variable_any logCompressorWakeup;
/* Notification sender work queue. FIXME: if hydra-queue-runner is
killed before it has finished sending notifications about a
build, then the notifications may be lost. It would be better
to mark builds with pending notification in the database. */
typedef std::pair<BuildID, std::vector<BuildID>> NotificationItem;
Sync<std::queue<NotificationItem>> notificationSenderQueue;
std::condition_variable_any notificationSenderWakeup;
/* Specific build to do for --build-one (testing only). */
BuildID buildOne;
public:
State();
private:
void clearBusy(Connection & conn, time_t stopTime);
/* (Re)load /etc/nix/machines. */
void loadMachinesFile();
/* Thread to reload /etc/nix/machines periodically. */
void monitorMachinesFile();
int createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step,
const std::string & machine, BuildStepStatus status, const std::string & errorMsg = "",
BuildID propagatedFrom = 0);
void finishBuildStep(pqxx::work & txn, time_t startTime, time_t stopTime, BuildID buildId, int stepNr,
const std::string & machine, BuildStepStatus status, const string & errorMsg = "",
BuildID propagatedFrom = 0);
void updateBuild(pqxx::work & txn, Build::ptr build, BuildStatus status);
void queueMonitor();
void queueMonitorLoop();
void getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store, unsigned int & lastBuildId);
void removeCancelledBuilds(Connection & conn);
Step::ptr createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath,
Build::ptr referringBuild, Step::ptr referringStep, std::set<Path> & finishedDrvs,
std::set<Step::ptr> & newSteps, std::set<Step::ptr> & newRunnable);
void makeRunnable(Step::ptr step);
/* The thread that selects and starts runnable builds. */
void dispatcher();
void wakeDispatcher();
void builder(Step::ptr step, Machine::ptr machine, std::shared_ptr<MaintainCount> reservation);
/* Perform the given build step. Return true if the step is to be
retried. */
bool doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,
Machine::ptr machine);
void markSucceededBuild(pqxx::work & txn, Build::ptr build,
const BuildResult & res, bool isCachedBuild, time_t startTime, time_t stopTime);
bool checkCachedFailure(Step::ptr step, Connection & conn);
/* Thread that asynchronously bzips logs of finished steps. */
void logCompressor();
/* Thread that asynchronously invokes hydra-notify to send build
notifications. */
void notificationSender();
/* Acquire the global queue runner lock, or null if somebody else
has it. */
std::shared_ptr<PathLocks> acquireGlobalLock();
void dumpStatus(Connection & conn, bool log);
public:
void showStatus();
void unlock();
void run(BuildID buildOne = 0);
};
State::State()
: copyClosureTokenServer{maxParallelCopyClosure}
{
hydraData = getEnv("HYDRA_DATA");
if (hydraData == "") throw Error("$HYDRA_DATA must be set");

View file

@ -0,0 +1,293 @@
#pragma once
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <map>
#include <memory>
#include <queue>
#include "db.hh"
#include "counter.hh"
#include "pathlocks.hh"
#include "pool.hh"
#include "sync.hh"
#include "token-server.hh"
#include "store-api.hh"
#include "derivations.hh"
using namespace nix;
typedef unsigned int BuildID;
typedef std::chrono::time_point<std::chrono::system_clock> system_time;
typedef enum {
bsSuccess = 0,
bsFailed = 1,
bsDepFailed = 2,
bsAborted = 3,
bsFailedWithOutput = 6,
bsTimedOut = 7,
bsUnsupported = 9,
} BuildStatus;
typedef enum {
bssSuccess = 0,
bssFailed = 1,
bssAborted = 4,
bssTimedOut = 7,
bssUnsupported = 9,
bssBusy = 100, // not stored
} BuildStepStatus;
struct Step;
struct BuildResult;
struct Build
{
typedef std::shared_ptr<Build> ptr;
typedef std::weak_ptr<Build> wptr;
BuildID id;
Path drvPath;
std::map<string, Path> outputs;
std::string fullJobName;
unsigned int maxSilentTime, buildTimeout;
std::shared_ptr<Step> toplevel;
std::atomic_bool finishedInDB{false};
};
struct Step
{
typedef std::shared_ptr<Step> ptr;
typedef std::weak_ptr<Step> wptr;
Path drvPath;
Derivation drv;
std::set<std::string> requiredSystemFeatures;
bool preferLocalBuild;
struct State
{
/* Whether the step has finished initialisation. */
bool created = false;
/* The build steps on which this step depends. */
std::set<Step::ptr> deps;
/* The build steps that depend on this step. */
std::vector<Step::wptr> rdeps;
/* Builds that have this step as the top-level derivation. */
std::vector<Build::wptr> builds;
/* Number of times we've tried this step. */
unsigned int tries = 0;
/* Point in time after which the step can be retried. */
system_time after;
};
std::atomic_bool finished{false}; // debugging
Sync<State> state;
~Step()
{
//printMsg(lvlError, format("destroying step %1%") % drvPath);
}
};
struct Machine
{
typedef std::shared_ptr<Machine> ptr;
std::string sshName, sshKey;
std::set<std::string> systemTypes, supportedFeatures, mandatoryFeatures;
unsigned int maxJobs = 1;
float speedFactor = 1.0;
struct State {
typedef std::shared_ptr<State> ptr;
counter currentJobs{0};
counter nrStepsDone{0};
counter totalStepTime{0}; // total time for steps, including closure copying
counter totalStepBuildTime{0}; // total build time for steps
};
State::ptr state;
bool supportsStep(Step::ptr step)
{
if (systemTypes.find(step->drv.platform) == systemTypes.end()) return false;
for (auto & f : mandatoryFeatures)
if (step->requiredSystemFeatures.find(f) == step->requiredSystemFeatures.end()
&& !(step->preferLocalBuild && f == "local"))
return false;
for (auto & f : step->requiredSystemFeatures)
if (supportedFeatures.find(f) == supportedFeatures.end()) return false;
return true;
}
};
class State
{
private:
Path hydraData, logDir;
StringSet localPlatforms;
/* The queued builds. */
typedef std::map<BuildID, Build::ptr> Builds;
Sync<Builds> builds;
/* All active or pending build steps (i.e. dependencies of the
queued builds). Note that these are weak pointers. Steps are
kept alive by being reachable from Builds or by being in
progress. */
typedef std::map<Path, Step::wptr> Steps;
Sync<Steps> steps;
/* Build steps that have no unbuilt dependencies. */
typedef std::list<Step::wptr> Runnable;
Sync<Runnable> runnable;
/* CV for waking up the dispatcher. */
std::condition_variable dispatcherWakeup;
std::mutex dispatcherMutex;
/* PostgreSQL connection pool. */
Pool<Connection> dbPool;
/* The build machines. */
typedef std::map<string, Machine::ptr> Machines;
Sync<Machines> machines; // FIXME: use atomic_shared_ptr
Path machinesFile;
struct stat machinesFileStat;
/* Token server limiting the number of threads copying closures in
parallel to prevent excessive I/O load. */
TokenServer copyClosureTokenServer;
/* Various stats. */
time_t startedAt;
counter nrBuildsRead{0};
counter nrBuildsDone{0};
counter nrStepsDone{0};
counter nrActiveSteps{0};
counter nrStepsBuilding{0};
counter nrStepsCopyingTo{0};
counter nrStepsCopyingFrom{0};
counter nrRetries{0};
counter maxNrRetries{0};
counter totalStepTime{0}; // total time for steps, including closure copying
counter totalStepBuildTime{0}; // total build time for steps
counter nrQueueWakeups{0};
counter nrDispatcherWakeups{0};
counter bytesSent{0};
counter bytesReceived{0};
/* Log compressor work queue. */
Sync<std::queue<Path>> logCompressorQueue;
std::condition_variable_any logCompressorWakeup;
/* Notification sender work queue. FIXME: if hydra-queue-runner is
killed before it has finished sending notifications about a
build, then the notifications may be lost. It would be better
to mark builds with pending notification in the database. */
typedef std::pair<BuildID, std::vector<BuildID>> NotificationItem;
Sync<std::queue<NotificationItem>> notificationSenderQueue;
std::condition_variable_any notificationSenderWakeup;
/* Specific build to do for --build-one (testing only). */
BuildID buildOne;
public:
State();
private:
void clearBusy(Connection & conn, time_t stopTime);
/* (Re)load /etc/nix/machines. */
void loadMachinesFile();
/* Thread to reload /etc/nix/machines periodically. */
void monitorMachinesFile();
int createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step,
const std::string & machine, BuildStepStatus status, const std::string & errorMsg = "",
BuildID propagatedFrom = 0);
void finishBuildStep(pqxx::work & txn, time_t startTime, time_t stopTime, BuildID buildId, int stepNr,
const std::string & machine, BuildStepStatus status, const string & errorMsg = "",
BuildID propagatedFrom = 0);
void updateBuild(pqxx::work & txn, Build::ptr build, BuildStatus status);
void queueMonitor();
void queueMonitorLoop();
void getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store, unsigned int & lastBuildId);
void removeCancelledBuilds(Connection & conn);
Step::ptr createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath,
Build::ptr referringBuild, Step::ptr referringStep, std::set<Path> & finishedDrvs,
std::set<Step::ptr> & newSteps, std::set<Step::ptr> & newRunnable);
void makeRunnable(Step::ptr step);
/* The thread that selects and starts runnable builds. */
void dispatcher();
void wakeDispatcher();
void builder(Step::ptr step, Machine::ptr machine, std::shared_ptr<MaintainCount> reservation);
/* Perform the given build step. Return true if the step is to be
retried. */
bool doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,
Machine::ptr machine);
void markSucceededBuild(pqxx::work & txn, Build::ptr build,
const BuildResult & res, bool isCachedBuild, time_t startTime, time_t stopTime);
bool checkCachedFailure(Step::ptr step, Connection & conn);
/* Thread that asynchronously bzips logs of finished steps. */
void logCompressor();
/* Thread that asynchronously invokes hydra-notify to send build
notifications. */
void notificationSender();
/* Acquire the global queue runner lock, or null if somebody else
has it. */
std::shared_ptr<PathLocks> acquireGlobalLock();
void dumpStatus(Connection & conn, bool log);
public:
void showStatus();
void unlock();
void run(BuildID buildOne = 0);
};