Sync with Nix

This commit is contained in:
Eelco Dolstra 2016-02-24 14:04:31 +01:00
parent 7b509237cd
commit 8321a3eb27
12 changed files with 50 additions and 229 deletions

View file

@ -12,9 +12,9 @@
namespace nix { namespace nix {
BinaryCacheStore::BinaryCacheStore(const StoreFactory & storeFactory, BinaryCacheStore::BinaryCacheStore(std::shared_ptr<Store> localStore,
const Path & secretKeyFile, const Path & publicKeyFile) const Path & secretKeyFile, const Path & publicKeyFile)
: storeFactory(storeFactory) : localStore(localStore)
{ {
if (secretKeyFile != "") if (secretKeyFile != "")
secretKey = std::unique_ptr<SecretKey>(new SecretKey(readFile(secretKeyFile))); secretKey = std::unique_ptr<SecretKey>(new SecretKey(readFile(secretKeyFile)));
@ -237,14 +237,14 @@ void BinaryCacheStore::querySubstitutablePathInfos(const PathSet & paths,
{ {
PathSet left; PathSet left;
auto localStore = storeFactory(); if (!localStore) return;
for (auto & storePath : paths) { for (auto & storePath : paths) {
if (!(*localStore)->isValidPath(storePath)) { if (!localStore->isValidPath(storePath)) {
left.insert(storePath); left.insert(storePath);
continue; continue;
} }
ValidPathInfo info = (*localStore)->queryPathInfo(storePath); ValidPathInfo info = localStore->queryPathInfo(storePath);
SubstitutablePathInfo sub; SubstitutablePathInfo sub;
sub.references = info.references; sub.references = info.references;
sub.downloadSize = 0; sub.downloadSize = 0;
@ -253,24 +253,25 @@ void BinaryCacheStore::querySubstitutablePathInfos(const PathSet & paths,
} }
if (settings.useSubstitutes) if (settings.useSubstitutes)
(*localStore)->querySubstitutablePathInfos(left, infos); localStore->querySubstitutablePathInfos(left, infos);
} }
void BinaryCacheStore::buildPaths(const PathSet & paths, BuildMode buildMode) void BinaryCacheStore::buildPaths(const PathSet & paths, BuildMode buildMode)
{ {
auto localStore = storeFactory();
for (auto & storePath : paths) { for (auto & storePath : paths) {
assert(!isDerivation(storePath)); assert(!isDerivation(storePath));
if (isValidPath(storePath)) continue; if (isValidPath(storePath)) continue;
(*localStore)->addTempRoot(storePath); if (!localStore)
throw Error(format("don't know how to realise path %1% in a binary cache") % storePath);
if (!(*localStore)->isValidPath(storePath)) localStore->addTempRoot(storePath);
(*localStore)->ensurePath(storePath);
ValidPathInfo info = (*localStore)->queryPathInfo(storePath); if (!localStore->isValidPath(storePath))
localStore->ensurePath(storePath);
ValidPathInfo info = localStore->queryPathInfo(storePath);
for (auto & ref : info.references) for (auto & ref : info.references)
if (ref != storePath) if (ref != storePath)

View file

@ -13,12 +13,6 @@ namespace nix {
struct NarInfo; struct NarInfo;
/* While BinaryCacheStore is thread-safe, LocalStore and RemoteStore
aren't. Until they are, use a factory to produce a thread-local
local store. */
typedef Pool<nix::ref<nix::Store>> StorePool;
typedef std::function<StorePool::Handle()> StoreFactory;
class BinaryCacheStore : public Store class BinaryCacheStore : public Store
{ {
private: private:
@ -26,7 +20,7 @@ private:
std::unique_ptr<SecretKey> secretKey; std::unique_ptr<SecretKey> secretKey;
std::unique_ptr<PublicKeys> publicKeys; std::unique_ptr<PublicKeys> publicKeys;
StoreFactory storeFactory; std::shared_ptr<Store> localStore;
struct State struct State
{ {
@ -37,7 +31,7 @@ private:
protected: protected:
BinaryCacheStore(const StoreFactory & storeFactory, BinaryCacheStore(std::shared_ptr<Store> localStore,
const Path & secretKeyFile, const Path & publicKeyFile); const Path & secretKeyFile, const Path & publicKeyFile);
virtual bool fileExists(const std::string & path) = 0; virtual bool fileExists(const std::string & path) = 0;

View file

@ -18,7 +18,6 @@ using namespace nix;
State::State() State::State()
: localStorePool([]() { return std::make_shared<ref<Store>>(openStore()); })
{ {
hydraData = getEnv("HYDRA_DATA"); hydraData = getEnv("HYDRA_DATA");
if (hydraData == "") throw Error("$HYDRA_DATA must be set"); if (hydraData == "") throw Error("$HYDRA_DATA must be set");
@ -46,10 +45,9 @@ State::State()
} }
StorePool::Handle State::getLocalStore() ref<Store> State::getLocalStore()
{ {
auto conn(localStorePool.get()); return ref<Store>(_localStore);
return conn;
} }
@ -748,8 +746,10 @@ void State::run(BuildID buildOne)
auto storeMode = hydraConfig["store_mode"]; auto storeMode = hydraConfig["store_mode"];
_localStore = openStore();
if (storeMode == "direct" || storeMode == "") { if (storeMode == "direct" || storeMode == "") {
_destStore = openStore(); _destStore = _localStore;
} }
else if (storeMode == "local-binary-cache") { else if (storeMode == "local-binary-cache") {
@ -757,9 +757,9 @@ void State::run(BuildID buildOne)
if (dir == "") if (dir == "")
throw Error("you must set binary_cache_dir in hydra.conf"); throw Error("you must set binary_cache_dir in hydra.conf");
auto store = make_ref<LocalBinaryCacheStore>( auto store = make_ref<LocalBinaryCacheStore>(
[this]() { return this->getLocalStore(); }, _localStore,
"/home/eelco/Misc/Keys/test.nixos.org/secret", hydraConfig["binary_cache_secret_key_file"],
"/home/eelco/Misc/Keys/test.nixos.org/public", hydraConfig["binary_cache_public_key_file"],
dir); dir);
store->init(); store->init();
_destStore = std::shared_ptr<LocalBinaryCacheStore>(store); _destStore = std::shared_ptr<LocalBinaryCacheStore>(store);
@ -770,7 +770,7 @@ void State::run(BuildID buildOne)
if (bucketName == "") if (bucketName == "")
throw Error("you must set binary_cache_s3_bucket in hydra.conf"); throw Error("you must set binary_cache_s3_bucket in hydra.conf");
auto store = make_ref<S3BinaryCacheStore>( auto store = make_ref<S3BinaryCacheStore>(
[this]() { return this->getLocalStore(); }, _localStore,
hydraConfig["binary_cache_secret_key_file"], hydraConfig["binary_cache_secret_key_file"],
hydraConfig["binary_cache_public_key_file"], hydraConfig["binary_cache_public_key_file"],
bucketName); bucketName);

View file

@ -2,10 +2,10 @@
namespace nix { namespace nix {
LocalBinaryCacheStore::LocalBinaryCacheStore(const StoreFactory & storeFactory, LocalBinaryCacheStore::LocalBinaryCacheStore(std::shared_ptr<Store> localStore,
const Path & secretKeyFile, const Path & publicKeyFile, const Path & secretKeyFile, const Path & publicKeyFile,
const Path & binaryCacheDir) const Path & binaryCacheDir)
: BinaryCacheStore(storeFactory, secretKeyFile, publicKeyFile) : BinaryCacheStore(localStore, secretKeyFile, publicKeyFile)
, binaryCacheDir(binaryCacheDir) , binaryCacheDir(binaryCacheDir)
{ {
} }

View file

@ -12,7 +12,7 @@ private:
public: public:
LocalBinaryCacheStore(const StoreFactory & storeFactory, LocalBinaryCacheStore(std::shared_ptr<Store> localStore,
const Path & secretKeyFile, const Path & publicKeyFile, const Path & secretKeyFile, const Path & publicKeyFile,
const Path & binaryCacheDir); const Path & binaryCacheDir);

View file

@ -1,97 +0,0 @@
#pragma once
#include <memory>
#include <list>
#include <functional>
#include "sync.hh"
/* This template class implements a simple pool manager of resources
of some type R, such as database connections. It is used as
follows:
class Connection { ... };
Pool<Connection> pool;
{
auto conn(pool.get());
conn->exec("select ...");
}
Here, the Connection object referenced by conn is automatically
returned to the pool when conn goes out of scope.
*/
template <class R>
class Pool
{
public:
typedef std::function<std::shared_ptr<R>()> Factory;
private:
Factory factory;
struct State
{
unsigned int count = 0;
std::list<std::shared_ptr<R>> idle;
};
Sync<State> state;
public:
Pool(const Factory & factory = []() { return std::make_shared<R>(); })
: factory(factory)
{ }
class Handle
{
private:
Pool & pool;
std::shared_ptr<R> r;
friend Pool;
Handle(Pool & pool, std::shared_ptr<R> r) : pool(pool), r(r) { }
public:
Handle(Handle && h) : pool(h.pool), r(h.r) { h.r.reset(); }
Handle(const Handle & l) = delete;
~Handle()
{
auto state_(pool.state.lock());
if (r) state_->idle.push_back(r);
}
R * operator -> () { return r.get(); }
R & operator * () { return *r; }
};
Handle get()
{
{
auto state_(state.lock());
if (!state_->idle.empty()) {
auto p = state_->idle.back();
state_->idle.pop_back();
return Handle(*this, p);
}
state_->count++;
}
/* Note: we don't hold the lock while creating a new instance,
because creation might take a long time. */
return Handle(*this, factory());
}
unsigned int count()
{
auto state_(state.lock());
return state_->count;
}
};

View file

@ -36,7 +36,7 @@ void State::queueMonitorLoop()
unsigned int lastBuildId = 0; unsigned int lastBuildId = 0;
while (true) { while (true) {
bool done = getQueuedBuilds(*conn, *localStore, destStore, lastBuildId); bool done = getQueuedBuilds(*conn, localStore, destStore, lastBuildId);
/* Sleep until we get notification from the database about an /* Sleep until we get notification from the database about an
event. */ event. */

View file

@ -31,10 +31,10 @@ R && checkAws(Aws::Utils::Outcome<R, E> && outcome)
return outcome.GetResultWithOwnership(); return outcome.GetResultWithOwnership();
} }
S3BinaryCacheStore::S3BinaryCacheStore(const StoreFactory & storeFactory, S3BinaryCacheStore::S3BinaryCacheStore(std::shared_ptr<Store> localStore,
const Path & secretKeyFile, const Path & publicKeyFile, const Path & secretKeyFile, const Path & publicKeyFile,
const std::string & bucketName) const std::string & bucketName)
: BinaryCacheStore(storeFactory, secretKeyFile, publicKeyFile) : BinaryCacheStore(localStore, secretKeyFile, publicKeyFile)
, bucketName(bucketName) , bucketName(bucketName)
, config(makeConfig()) , config(makeConfig())
, client(make_ref<Aws::S3::S3Client>(*config)) , client(make_ref<Aws::S3::S3Client>(*config))

View file

@ -20,7 +20,7 @@ private:
public: public:
S3BinaryCacheStore(const StoreFactory & storeFactory, S3BinaryCacheStore(std::shared_ptr<Store> localStore,
const Path & secretKeyFile, const Path & publicKeyFile, const Path & secretKeyFile, const Path & publicKeyFile,
const std::string & bucketName); const std::string & bucketName);

View file

@ -80,7 +80,7 @@ private:
std::atomic<unsigned int> shares{1}; std::atomic<unsigned int> shares{1};
/* The start time and duration of the most recent build steps. */ /* The start time and duration of the most recent build steps. */
Sync<std::map<time_t, time_t>> steps; nix::Sync<std::map<time_t, time_t>> steps;
public: public:
@ -187,7 +187,7 @@ struct Step
std::atomic_bool finished{false}; // debugging std::atomic_bool finished{false}; // debugging
Sync<State> state; nix::Sync<State> state;
~Step() ~Step()
{ {
@ -227,7 +227,7 @@ struct Machine
system_time lastFailure, disabledUntil; system_time lastFailure, disabledUntil;
unsigned int consecutiveFailures; unsigned int consecutiveFailures;
}; };
Sync<ConnectInfo> connectInfo; nix::Sync<ConnectInfo> connectInfo;
/* Mutex to prevent multiple threads from sending data to the /* Mutex to prevent multiple threads from sending data to the
same machine (which would be inefficient). */ same machine (which would be inefficient). */
@ -266,33 +266,33 @@ private:
/* The queued builds. */ /* The queued builds. */
typedef std::map<BuildID, Build::ptr> Builds; typedef std::map<BuildID, Build::ptr> Builds;
Sync<Builds> builds; nix::Sync<Builds> builds;
/* The jobsets. */ /* The jobsets. */
typedef std::map<std::pair<std::string, std::string>, Jobset::ptr> Jobsets; typedef std::map<std::pair<std::string, std::string>, Jobset::ptr> Jobsets;
Sync<Jobsets> jobsets; nix::Sync<Jobsets> jobsets;
/* All active or pending build steps (i.e. dependencies of the /* All active or pending build steps (i.e. dependencies of the
queued builds). Note that these are weak pointers. Steps are queued builds). Note that these are weak pointers. Steps are
kept alive by being reachable from Builds or by being in kept alive by being reachable from Builds or by being in
progress. */ progress. */
typedef std::map<nix::Path, Step::wptr> Steps; typedef std::map<nix::Path, Step::wptr> Steps;
Sync<Steps> steps; nix::Sync<Steps> steps;
/* Build steps that have no unbuilt dependencies. */ /* Build steps that have no unbuilt dependencies. */
typedef std::list<Step::wptr> Runnable; typedef std::list<Step::wptr> Runnable;
Sync<Runnable> runnable; nix::Sync<Runnable> runnable;
/* CV for waking up the dispatcher. */ /* CV for waking up the dispatcher. */
Sync<bool> dispatcherWakeup; nix::Sync<bool> dispatcherWakeup;
std::condition_variable_any dispatcherWakeupCV; std::condition_variable dispatcherWakeupCV;
/* PostgreSQL connection pool. */ /* PostgreSQL connection pool. */
Pool<Connection> dbPool; nix::Pool<Connection> dbPool;
/* The build machines. */ /* The build machines. */
typedef std::map<std::string, Machine::ptr> Machines; typedef std::map<std::string, Machine::ptr> Machines;
Sync<Machines> machines; // FIXME: use atomic_shared_ptr nix::Sync<Machines> machines; // FIXME: use atomic_shared_ptr
/* Various stats. */ /* Various stats. */
time_t startedAt; time_t startedAt;
@ -314,16 +314,16 @@ private:
counter bytesReceived{0}; counter bytesReceived{0};
/* Log compressor work queue. */ /* Log compressor work queue. */
Sync<std::queue<nix::Path>> logCompressorQueue; nix::Sync<std::queue<nix::Path>> logCompressorQueue;
std::condition_variable_any logCompressorWakeup; std::condition_variable logCompressorWakeup;
/* Notification sender work queue. FIXME: if hydra-queue-runner is /* Notification sender work queue. FIXME: if hydra-queue-runner is
killed before it has finished sending notifications about a killed before it has finished sending notifications about a
build, then the notifications may be lost. It would be better build, then the notifications may be lost. It would be better
to mark builds with pending notification in the database. */ to mark builds with pending notification in the database. */
typedef std::pair<BuildID, std::vector<BuildID>> NotificationItem; typedef std::pair<BuildID, std::vector<BuildID>> NotificationItem;
Sync<std::queue<NotificationItem>> notificationSenderQueue; nix::Sync<std::queue<NotificationItem>> notificationSenderQueue;
std::condition_variable_any notificationSenderWakeup; std::condition_variable notificationSenderWakeup;
/* Specific build to do for --build-one (testing only). */ /* Specific build to do for --build-one (testing only). */
BuildID buildOne; BuildID buildOne;
@ -336,7 +336,7 @@ private:
std::chrono::seconds waitTime; // time runnable steps have been waiting std::chrono::seconds waitTime; // time runnable steps have been waiting
}; };
Sync<std::map<std::string, MachineType>> machineTypes; nix::Sync<std::map<std::string, MachineType>> machineTypes;
struct MachineReservation struct MachineReservation
{ {
@ -350,10 +350,7 @@ private:
std::atomic<time_t> lastDispatcherCheck{0}; std::atomic<time_t> lastDispatcherCheck{0};
/* Pool of local stores. */ std::shared_ptr<nix::Store> _localStore;
nix::StorePool localStorePool;
/* Destination store. */
std::shared_ptr<nix::Store> _destStore; std::shared_ptr<nix::Store> _destStore;
public: public:
@ -363,7 +360,7 @@ private:
/* Return a store object that can access derivations produced by /* Return a store object that can access derivations produced by
hydra-evaluator. */ hydra-evaluator. */
nix::StorePool::Handle getLocalStore(); nix::ref<nix::Store> getLocalStore();
/* Return a store object to store build results. */ /* Return a store object to store build results. */
nix::ref<nix::Store> getDestStore(); nix::ref<nix::Store> getDestStore();

View file

@ -1,74 +0,0 @@
#pragma once
#include <mutex>
#include <condition_variable>
#include <cassert>
/* This template class ensures synchronized access to a value of type
T. It is used as follows:
struct Data { int x; ... };
Sync<Data> data;
{
auto data_(data.lock());
data_->x = 123;
}
Here, "data" is automatically unlocked when "data_" goes out of
scope.
*/
template<class T>
class Sync
{
private:
std::mutex mutex;
T data;
public:
Sync() { }
Sync(const T & data) : data(data) { }
class Lock
{
private:
Sync * s;
friend Sync;
Lock(Sync * s) : s(s) { s->mutex.lock(); }
public:
Lock(Lock && l) : s(l.s) { l.s = 0; }
Lock(const Lock & l) = delete;
~Lock() { if (s) s->mutex.unlock(); }
T * operator -> () { return &s->data; }
T & operator * () { return s->data; }
/* FIXME: performance impact of condition_variable_any? */
void wait(std::condition_variable_any & cv)
{
assert(s);
cv.wait(s->mutex);
}
template<class Rep, class Period, class Predicate>
bool wait_for(std::condition_variable_any & cv,
const std::chrono::duration<Rep, Period> & duration,
Predicate pred)
{
assert(s);
return cv.wait_for(s->mutex, duration, pred);
}
template<class Clock, class Duration>
std::cv_status wait_until(std::condition_variable_any & cv,
const std::chrono::time_point<Clock, Duration> & duration)
{
assert(s);
return cv.wait_until(s->mutex, duration);
}
};
Lock lock() { return Lock(this); }
};

View file

@ -14,7 +14,7 @@ class TokenServer
unsigned int maxTokens; unsigned int maxTokens;
Sync<unsigned int> curTokens{0}; Sync<unsigned int> curTokens{0};
std::condition_variable_any wakeup; std::condition_variable wakeup;
public: public:
TokenServer(unsigned int maxTokens) : maxTokens(maxTokens) { } TokenServer(unsigned int maxTokens) : maxTokens(maxTokens) { }