forked from lix-project/hydra
Pool local store connections
This commit is contained in:
parent
1cefd6cac8
commit
88a05763cc
6 changed files with 38 additions and 17 deletions
|
@ -234,11 +234,11 @@ void BinaryCacheStore::querySubstitutablePathInfos(const PathSet & paths,
|
||||||
auto localStore = storeFactory();
|
auto localStore = storeFactory();
|
||||||
|
|
||||||
for (auto & storePath : paths) {
|
for (auto & storePath : paths) {
|
||||||
if (!localStore->isValidPath(storePath)) {
|
if (!(*localStore)->isValidPath(storePath)) {
|
||||||
left.insert(storePath);
|
left.insert(storePath);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
ValidPathInfo info = localStore->queryPathInfo(storePath);
|
ValidPathInfo info = (*localStore)->queryPathInfo(storePath);
|
||||||
SubstitutablePathInfo sub;
|
SubstitutablePathInfo sub;
|
||||||
sub.references = info.references;
|
sub.references = info.references;
|
||||||
sub.downloadSize = 0;
|
sub.downloadSize = 0;
|
||||||
|
@ -246,7 +246,7 @@ void BinaryCacheStore::querySubstitutablePathInfos(const PathSet & paths,
|
||||||
infos.emplace(storePath, sub);
|
infos.emplace(storePath, sub);
|
||||||
}
|
}
|
||||||
|
|
||||||
localStore->querySubstitutablePathInfos(left, infos);
|
//(*localStore)->querySubstitutablePathInfos(left, infos);
|
||||||
}
|
}
|
||||||
|
|
||||||
void BinaryCacheStore::buildPaths(const PathSet & paths, BuildMode buildMode)
|
void BinaryCacheStore::buildPaths(const PathSet & paths, BuildMode buildMode)
|
||||||
|
@ -258,12 +258,12 @@ void BinaryCacheStore::buildPaths(const PathSet & paths, BuildMode buildMode)
|
||||||
|
|
||||||
if (isValidPath(storePath)) continue;
|
if (isValidPath(storePath)) continue;
|
||||||
|
|
||||||
localStore->addTempRoot(storePath);
|
(*localStore)->addTempRoot(storePath);
|
||||||
|
|
||||||
if (!localStore->isValidPath(storePath))
|
if (!(*localStore)->isValidPath(storePath))
|
||||||
localStore->ensurePath(storePath);
|
(*localStore)->ensurePath(storePath);
|
||||||
|
|
||||||
ValidPathInfo info = localStore->queryPathInfo(storePath);
|
ValidPathInfo info = (*localStore)->queryPathInfo(storePath);
|
||||||
|
|
||||||
for (auto & ref : info.references)
|
for (auto & ref : info.references)
|
||||||
if (ref != storePath)
|
if (ref != storePath)
|
||||||
|
|
|
@ -5,6 +5,7 @@
|
||||||
|
|
||||||
#include "lru-cache.hh"
|
#include "lru-cache.hh"
|
||||||
#include "sync.hh"
|
#include "sync.hh"
|
||||||
|
#include "pool.hh"
|
||||||
|
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
|
|
||||||
|
@ -15,7 +16,8 @@ struct NarInfo;
|
||||||
/* While BinaryCacheStore is thread-safe, LocalStore and RemoteStore
|
/* While BinaryCacheStore is thread-safe, LocalStore and RemoteStore
|
||||||
aren't. Until they are, use a factory to produce a thread-local
|
aren't. Until they are, use a factory to produce a thread-local
|
||||||
local store. */
|
local store. */
|
||||||
typedef std::function<ref<Store>()> StoreFactory;
|
typedef Pool<nix::ref<nix::Store>> StorePool;
|
||||||
|
typedef std::function<StorePool::Handle()> StoreFactory;
|
||||||
|
|
||||||
class BinaryCacheStore : public Store
|
class BinaryCacheStore : public Store
|
||||||
{
|
{
|
||||||
|
|
|
@ -18,6 +18,7 @@ using namespace nix;
|
||||||
|
|
||||||
|
|
||||||
State::State()
|
State::State()
|
||||||
|
: localStorePool([]() { return std::make_shared<ref<Store>>(openStore()); })
|
||||||
{
|
{
|
||||||
hydraData = getEnv("HYDRA_DATA");
|
hydraData = getEnv("HYDRA_DATA");
|
||||||
if (hydraData == "") throw Error("$HYDRA_DATA must be set");
|
if (hydraData == "") throw Error("$HYDRA_DATA must be set");
|
||||||
|
@ -26,9 +27,10 @@ State::State()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
ref<Store> State::getLocalStore()
|
StorePool::Handle State::getLocalStore()
|
||||||
{
|
{
|
||||||
return openStore(); // FIXME: pool
|
auto conn(localStorePool.get());
|
||||||
|
return conn;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -733,10 +735,10 @@ void State::run(BuildID buildOne)
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
auto store = std::make_shared<S3BinaryCacheStore>(
|
auto store = std::make_shared<S3BinaryCacheStore>(
|
||||||
[]() { return openStore(); },
|
[this]() { return this->getLocalStore(); },
|
||||||
"/home/eelco/Misc/Keys/test.nixos.org/secret",
|
"/home/eelco/hydra/secret",
|
||||||
"/home/eelco/Misc/Keys/test.nixos.org/public",
|
"/home/eelco/hydra/public",
|
||||||
"nix-test-cache-3");;
|
"nix-test-cache");
|
||||||
store->init();
|
store->init();
|
||||||
_destStore = store;
|
_destStore = store;
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
|
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <list>
|
#include <list>
|
||||||
|
#include <functional>
|
||||||
|
|
||||||
#include "sync.hh"
|
#include "sync.hh"
|
||||||
|
|
||||||
|
@ -25,7 +26,14 @@
|
||||||
template <class R>
|
template <class R>
|
||||||
class Pool
|
class Pool
|
||||||
{
|
{
|
||||||
|
public:
|
||||||
|
|
||||||
|
typedef std::function<std::shared_ptr<R>()> Factory;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
|
Factory factory;
|
||||||
|
|
||||||
struct State
|
struct State
|
||||||
{
|
{
|
||||||
unsigned int count = 0;
|
unsigned int count = 0;
|
||||||
|
@ -36,6 +44,10 @@ private:
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
|
||||||
|
Pool(const Factory & factory = []() { return std::make_shared<R>(); })
|
||||||
|
: factory(factory)
|
||||||
|
{ }
|
||||||
|
|
||||||
class Handle
|
class Handle
|
||||||
{
|
{
|
||||||
private:
|
private:
|
||||||
|
@ -74,7 +86,7 @@ public:
|
||||||
}
|
}
|
||||||
/* Note: we don't hold the lock while creating a new instance,
|
/* Note: we don't hold the lock while creating a new instance,
|
||||||
because creation might take a long time. */
|
because creation might take a long time. */
|
||||||
return Handle(*this, std::make_shared<R>());
|
return Handle(*this, factory());
|
||||||
}
|
}
|
||||||
|
|
||||||
unsigned int count()
|
unsigned int count()
|
||||||
|
|
|
@ -36,7 +36,7 @@ void State::queueMonitorLoop()
|
||||||
unsigned int lastBuildId = 0;
|
unsigned int lastBuildId = 0;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
bool done = getQueuedBuilds(*conn, localStore, destStore, lastBuildId);
|
bool done = getQueuedBuilds(*conn, *localStore, destStore, lastBuildId);
|
||||||
|
|
||||||
/* Sleep until we get notification from the database about an
|
/* Sleep until we get notification from the database about an
|
||||||
event. */
|
event. */
|
||||||
|
|
|
@ -16,6 +16,8 @@
|
||||||
#include "store-api.hh"
|
#include "store-api.hh"
|
||||||
#include "derivations.hh"
|
#include "derivations.hh"
|
||||||
|
|
||||||
|
#include "binary-cache-store.hh" // FIXME
|
||||||
|
|
||||||
|
|
||||||
typedef unsigned int BuildID;
|
typedef unsigned int BuildID;
|
||||||
|
|
||||||
|
@ -346,6 +348,9 @@ private:
|
||||||
|
|
||||||
std::atomic<time_t> lastDispatcherCheck{0};
|
std::atomic<time_t> lastDispatcherCheck{0};
|
||||||
|
|
||||||
|
/* Pool of local stores. */
|
||||||
|
nix::StorePool localStorePool;
|
||||||
|
|
||||||
/* Destination store. */
|
/* Destination store. */
|
||||||
std::shared_ptr<nix::Store> _destStore;
|
std::shared_ptr<nix::Store> _destStore;
|
||||||
|
|
||||||
|
@ -356,7 +361,7 @@ private:
|
||||||
|
|
||||||
/* Return a store object that can access derivations produced by
|
/* Return a store object that can access derivations produced by
|
||||||
hydra-evaluator. */
|
hydra-evaluator. */
|
||||||
nix::ref<nix::Store> getLocalStore();
|
nix::StorePool::Handle getLocalStore();
|
||||||
|
|
||||||
/* Return a store object to store build results. */
|
/* Return a store object to store build results. */
|
||||||
nix::ref<nix::Store> getDestStore();
|
nix::ref<nix::Store> getDestStore();
|
||||||
|
|
Loading…
Reference in a new issue