From 3a6cb2f2707d20ee05d1dfd53592c4df926aa6fa Mon Sep 17 00:00:00 2001 From: Eelco Dolstra Date: Fri, 29 May 2015 20:55:13 +0200 Subject: [PATCH] Implement a database connection pool --- src/hydra-queue-runner/hydra-queue-runner.cc | 47 ++++++----- src/hydra-queue-runner/pool.hh | 85 ++++++++++++++++++++ 2 files changed, 111 insertions(+), 21 deletions(-) create mode 100644 src/hydra-queue-runner/pool.hh diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index 2eee9463..ada15e67 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -9,6 +9,7 @@ #include "build-result.hh" #include "sync.hh" +#include "pool.hh" #include "store-api.hh" #include "derivations.hh" @@ -145,8 +146,10 @@ class State private: std::thread queueMonitorThread; - std::mutex queueMonitorMutex; + + /* CV for waking up the queue. */ std::condition_variable queueMonitorWakeup; + std::mutex queueMonitorMutex; /* The queued builds. */ typedef std::map Builds; @@ -165,12 +168,15 @@ private: std::condition_variable_any runnableWakeup; + /* PostgreSQL connection pool. */ + Pool dbPool; + public: State(); ~State(); - void markActiveBuildStepsAsAborted(pqxx::connection & conn, time_t stopTime); + void markActiveBuildStepsAsAborted(time_t stopTime); int createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step, BuildStepStatus status, const std::string & errorMsg = "", BuildID propagatedFrom = 0); @@ -182,7 +188,7 @@ public: void queueMonitor(); - void getQueuedBuilds(std::shared_ptr store, pqxx::connection & conn); + void getQueuedBuilds(std::shared_ptr store); Step::ptr createStep(std::shared_ptr store, const Path & drvPath, std::set & newRunnable); @@ -213,18 +219,18 @@ State::State() State::~State() { try { - Connection conn; printMsg(lvlError, "clearing active build steps..."); - markActiveBuildStepsAsAborted(conn, time(0)); + markActiveBuildStepsAsAborted(time(0)); } catch (...) { ignoreException(); } } -void State::markActiveBuildStepsAsAborted(pqxx::connection & conn, time_t stopTime) +void State::markActiveBuildStepsAsAborted(time_t stopTime) { - pqxx::work txn(conn); + auto conn(dbPool.get()); + pqxx::work txn(*conn); txn.parameterized ("update BuildSteps set busy = 0, status = $1, stopTime = $2 where busy = 1") ((int) bssAborted) @@ -272,10 +278,8 @@ void State::queueMonitor() { auto store = openStore(); // FIXME: pool - Connection conn; - while (!exitRequested) { - getQueuedBuilds(store, conn); + getQueuedBuilds(store); { std::unique_lock lock(queueMonitorMutex); @@ -287,10 +291,12 @@ void State::queueMonitor() } -void State::getQueuedBuilds(std::shared_ptr store, pqxx::connection & conn) +void State::getQueuedBuilds(std::shared_ptr store) { printMsg(lvlError, "checking the queue..."); + auto conn(dbPool.get()); + #if 0 { auto runnable_(runnable.lock()); @@ -308,7 +314,7 @@ void State::getQueuedBuilds(std::shared_ptr store, pqxx::connection & std::list newBuilds; // FIXME: use queue { - pqxx::work txn(conn); + pqxx::work txn(*conn); // FIXME: query only builds with ID higher than the previous // highest. @@ -340,7 +346,7 @@ void State::getQueuedBuilds(std::shared_ptr store, pqxx::connection & if (!store->isValidPath(build->drvPath)) { /* Derivation has been GC'ed prematurely. */ - pqxx::work txn(conn); + pqxx::work txn(*conn); txn.parameterized ("update Builds set finished = 1, buildStatus = $2, startTime = $3, stopTime = $3, errorMsg = $4 where id = $1") (build->id) @@ -360,7 +366,7 @@ void State::getQueuedBuilds(std::shared_ptr store, pqxx::connection & Derivation drv = readDerivation(build->drvPath); BuildResult res = getBuildResult(store, drv); - pqxx::work txn(conn); + pqxx::work txn(*conn); time_t now = time(0); markSucceededBuild(txn, build, res, true, now, now); txn.commit(); @@ -618,11 +624,11 @@ void State::doBuildStep(std::shared_ptr store, Step::ptr step) /* Create a build step record indicating that we started building. */ - Connection conn; + auto conn(dbPool.get()); time_t startTime = time(0); int stepNr; { - pqxx::work txn(conn); + pqxx::work txn(*conn); stepNr = createBuildStep(txn, startTime, build, step, bssBusy); txn.commit(); } @@ -668,7 +674,7 @@ void State::doBuildStep(std::shared_ptr store, Step::ptr step) /* Update the database. */ { - pqxx::work txn(conn); + pqxx::work txn(*conn); if (success) { @@ -757,10 +763,7 @@ void State::markSucceededBuild(pqxx::work & txn, Build::ptr build, void State::run() { - { - Connection conn; - markActiveBuildStepsAsAborted(conn, 0); - } + markActiveBuildStepsAsAborted(0); queueMonitorThread = std::thread(&State::queueMonitor, this); @@ -785,6 +788,8 @@ void State::run() { auto runnable_(runnable.lock()); } // barrier runnableWakeup.notify_all(); for (auto & thread : builderThreads) thread.join(); + + printMsg(lvlError, format("psql connections = %1%") % dbPool.count()); } diff --git a/src/hydra-queue-runner/pool.hh b/src/hydra-queue-runner/pool.hh new file mode 100644 index 00000000..0a58ebe0 --- /dev/null +++ b/src/hydra-queue-runner/pool.hh @@ -0,0 +1,85 @@ +#pragma once + +#include +#include + +#include "sync.hh" + +/* This template class implements a simple pool manager of resources + of some type R, such as database connections. It is used as + follows: + + class Connection { ... }; + + Pool pool; + + { + auto conn(pool.get()); + conn->exec("select ..."); + } + + Here, the Connection object referenced by ‘conn’ is automatically + returned to the pool when ‘conn’ goes out of scope. +*/ + +template +class Pool +{ +private: + struct State + { + unsigned int count = 0; + std::list> idle; + }; + + Sync state; + +public: + + class Handle + { + private: + Pool & pool; + std::shared_ptr r; + + friend Pool; + + Handle(Pool & pool, std::shared_ptr r) : pool(pool), r(r) { } + + public: + Handle(Handle && h) : pool(h.pool), r(h.r) { h.r.reset(); } + + Handle(const Handle & l) = delete; + + ~Handle() + { + auto state_(pool.state.lock()); + if (r) state_->idle.push_back(r); + } + + R * operator -> () { return r; } + R & operator * () { return *r; } + }; + + Handle get() + { + { + auto state_(state.lock()); + if (!state_->idle.empty()) { + auto p = state_->idle.back(); + state_->idle.pop_back(); + return Handle(*this, p); + } + state_->count++; + } + /* Note: we don't hold the lock while creating a new instance, + because creation might take a long time. */ + return Handle(*this, std::make_shared()); + } + + unsigned int count() + { + auto state_(state.lock()); + return state_->count; + } +};