From d5626bf4c14f725136f2c5b6ac8bf818627352f0 Mon Sep 17 00:00:00 2001 From: Eelco Dolstra Date: Tue, 23 Feb 2016 16:40:16 +0100 Subject: [PATCH] Pool: Allow a maximum pool size --- src/libmain/shared.cc | 1 + src/libstore/remote-store.cc | 30 ++++++++-------- src/libstore/remote-store.hh | 7 ++-- src/libutil/pool.hh | 69 ++++++++++++++++++++++++++++-------- 4 files changed, 74 insertions(+), 33 deletions(-) diff --git a/src/libmain/shared.cc b/src/libmain/shared.cc index 8f2aa8420..c27302227 100644 --- a/src/libmain/shared.cc +++ b/src/libmain/shared.cc @@ -126,6 +126,7 @@ void initNix() std::cerr.rdbuf()->pubsetbuf(buf, sizeof(buf)); #endif + // FIXME: do we need this? It's not thread-safe. std::ios::sync_with_stdio(false); if (getEnv("IN_SYSTEMD") == "1") diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc index 847da107a..f6ec3fffb 100644 --- a/src/libstore/remote-store.cc +++ b/src/libstore/remote-store.cc @@ -14,8 +14,8 @@ #include #include #include - #include + #include namespace nix { @@ -39,8 +39,8 @@ template T readStorePaths(Source & from) template PathSet readStorePaths(Source & from); -RemoteStore::RemoteStore() - : connections(make_ref>([this]() { return openConnection(); })) +RemoteStore::RemoteStore(size_t maxConnections) + : connections(make_ref>(maxConnections, [this]() { return openConnection(); })) { } @@ -116,18 +116,6 @@ ref RemoteStore::openConnection(bool reserveSpace) } -RemoteStore::~RemoteStore() -{ - try { - //to.flush(); - //fdSocket.close(); - // FIXME: close pool - } catch (...) { - ignoreException(); - } -} - - void RemoteStore::setOptions(ref conn) { conn->to << wopSetOptions @@ -568,6 +556,18 @@ bool RemoteStore::verifyStore(bool checkContents, bool repair) return readInt(conn->from) != 0; } + +RemoteStore::Connection::~Connection() +{ + try { + to.flush(); + fd.close(); + } catch (...) { + ignoreException(); + } +} + + void RemoteStore::Connection::processStderr(Sink * sink, Source * source) { to.flush(); diff --git a/src/libstore/remote-store.hh b/src/libstore/remote-store.hh index b16a6b51d..af417b84f 100644 --- a/src/libstore/remote-store.hh +++ b/src/libstore/remote-store.hh @@ -1,5 +1,6 @@ #pragma once +#include #include #include "store-api.hh" @@ -19,9 +20,7 @@ class RemoteStore : public Store { public: - RemoteStore(); - - ~RemoteStore(); + RemoteStore(size_t maxConnections = std::numeric_limits::max()); /* Implementations of abstract store API methods. */ @@ -100,6 +99,8 @@ private: FdSource from; unsigned int daemonVersion; + ~Connection(); + void processStderr(Sink * sink = 0, Source * source = 0); }; diff --git a/src/libutil/pool.hh b/src/libutil/pool.hh index d63912e28..4b865a193 100644 --- a/src/libutil/pool.hh +++ b/src/libutil/pool.hh @@ -1,8 +1,10 @@ #pragma once -#include -#include #include +#include +#include +#include +#include #include "sync.hh" #include "ref.hh" @@ -39,37 +41,58 @@ private: struct State { - unsigned int count = 0; - std::list> idle; + size_t inUse = 0; + size_t max; + std::vector> idle; }; Sync state; + std::condition_variable_any wakeup; + public: - Pool(const Factory & factory = []() { return make_ref(); }) + Pool(size_t max = std::numeric_limits::max, + const Factory & factory = []() { return make_ref(); }) : factory(factory) - { } + { + auto state_(state.lock()); + state_->max = max; + } + + ~Pool() + { + auto state_(state.lock()); + assert(!state_->inUse); + state_->max = 0; + state_->idle.clear(); + } class Handle { private: Pool & pool; - ref r; + std::shared_ptr r; friend Pool; Handle(Pool & pool, std::shared_ptr r) : pool(pool), r(r) { } public: - Handle(Handle && h) : pool(h.pool), r(h.r) { abort(); } + Handle(Handle && h) : pool(h.pool), r(h.r) { h.r.reset(); } Handle(const Handle & l) = delete; ~Handle() { - auto state_(pool.state.lock()); - state_->idle.push_back(r); + if (!r) return; + { + auto state_(pool.state.lock()); + state_->idle.push_back(ref(r)); + assert(state_->inUse); + state_->inUse--; + } + pool.wakeup.notify_one(); } R * operator -> () { return &*r; } @@ -80,22 +103,38 @@ public: { { auto state_(state.lock()); + + /* If we're over the maximum number of instance, we need + to wait until a slot becomes available. */ + while (state_->idle.empty() && state_->inUse >= state_->max) + state_.wait(wakeup); + if (!state_->idle.empty()) { auto p = state_->idle.back(); state_->idle.pop_back(); + state_->inUse++; return Handle(*this, p); } - state_->count++; + + state_->inUse++; + } + + /* We need to create a new instance. Because that might take a + while, we don't hold the lock in the meantime. */ + try { + Handle h(*this, factory()); + return h; + } catch (...) { + auto state_(state.lock()); + state_->inUse--; + throw; } - /* Note: we don't hold the lock while creating a new instance, - because creation might take a long time. */ - return Handle(*this, factory()); } unsigned int count() { auto state_(state.lock()); - return state_->count; + return state_->count + state_->inUse; } };