Implement a database connection pool

This commit is contained in:
Eelco Dolstra 2015-05-29 20:55:13 +02:00
parent 214b95706c
commit 3a6cb2f270
2 changed files with 111 additions and 21 deletions

View file

@ -9,6 +9,7 @@
#include "build-result.hh" #include "build-result.hh"
#include "sync.hh" #include "sync.hh"
#include "pool.hh"
#include "store-api.hh" #include "store-api.hh"
#include "derivations.hh" #include "derivations.hh"
@ -145,8 +146,10 @@ class State
private: private:
std::thread queueMonitorThread; std::thread queueMonitorThread;
std::mutex queueMonitorMutex;
/* CV for waking up the queue. */
std::condition_variable queueMonitorWakeup; std::condition_variable queueMonitorWakeup;
std::mutex queueMonitorMutex;
/* The queued builds. */ /* The queued builds. */
typedef std::map<BuildID, Build::ptr> Builds; typedef std::map<BuildID, Build::ptr> Builds;
@ -165,12 +168,15 @@ private:
std::condition_variable_any runnableWakeup; std::condition_variable_any runnableWakeup;
/* PostgreSQL connection pool. */
Pool<Connection> dbPool;
public: public:
State(); State();
~State(); ~State();
void markActiveBuildStepsAsAborted(pqxx::connection & conn, time_t stopTime); void markActiveBuildStepsAsAborted(time_t stopTime);
int createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step, int createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step,
BuildStepStatus status, const std::string & errorMsg = "", BuildID propagatedFrom = 0); BuildStepStatus status, const std::string & errorMsg = "", BuildID propagatedFrom = 0);
@ -182,7 +188,7 @@ public:
void queueMonitor(); void queueMonitor();
void getQueuedBuilds(std::shared_ptr<StoreAPI> store, pqxx::connection & conn); void getQueuedBuilds(std::shared_ptr<StoreAPI> store);
Step::ptr createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath, Step::ptr createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath,
std::set<Step::ptr> & newRunnable); std::set<Step::ptr> & newRunnable);
@ -213,18 +219,18 @@ State::State()
State::~State() State::~State()
{ {
try { try {
Connection conn;
printMsg(lvlError, "clearing active build steps..."); printMsg(lvlError, "clearing active build steps...");
markActiveBuildStepsAsAborted(conn, time(0)); markActiveBuildStepsAsAborted(time(0));
} catch (...) { } catch (...) {
ignoreException(); ignoreException();
} }
} }
void State::markActiveBuildStepsAsAborted(pqxx::connection & conn, time_t stopTime) void State::markActiveBuildStepsAsAborted(time_t stopTime)
{ {
pqxx::work txn(conn); auto conn(dbPool.get());
pqxx::work txn(*conn);
txn.parameterized txn.parameterized
("update BuildSteps set busy = 0, status = $1, stopTime = $2 where busy = 1") ("update BuildSteps set busy = 0, status = $1, stopTime = $2 where busy = 1")
((int) bssAborted) ((int) bssAborted)
@ -272,10 +278,8 @@ void State::queueMonitor()
{ {
auto store = openStore(); // FIXME: pool auto store = openStore(); // FIXME: pool
Connection conn;
while (!exitRequested) { while (!exitRequested) {
getQueuedBuilds(store, conn); getQueuedBuilds(store);
{ {
std::unique_lock<std::mutex> lock(queueMonitorMutex); std::unique_lock<std::mutex> lock(queueMonitorMutex);
@ -287,10 +291,12 @@ void State::queueMonitor()
} }
void State::getQueuedBuilds(std::shared_ptr<StoreAPI> store, pqxx::connection & conn) void State::getQueuedBuilds(std::shared_ptr<StoreAPI> store)
{ {
printMsg(lvlError, "checking the queue..."); printMsg(lvlError, "checking the queue...");
auto conn(dbPool.get());
#if 0 #if 0
{ {
auto runnable_(runnable.lock()); auto runnable_(runnable.lock());
@ -308,7 +314,7 @@ void State::getQueuedBuilds(std::shared_ptr<StoreAPI> store, pqxx::connection &
std::list<Build::ptr> newBuilds; // FIXME: use queue std::list<Build::ptr> newBuilds; // FIXME: use queue
{ {
pqxx::work txn(conn); pqxx::work txn(*conn);
// FIXME: query only builds with ID higher than the previous // FIXME: query only builds with ID higher than the previous
// highest. // highest.
@ -340,7 +346,7 @@ void State::getQueuedBuilds(std::shared_ptr<StoreAPI> store, pqxx::connection &
if (!store->isValidPath(build->drvPath)) { if (!store->isValidPath(build->drvPath)) {
/* Derivation has been GC'ed prematurely. */ /* Derivation has been GC'ed prematurely. */
pqxx::work txn(conn); pqxx::work txn(*conn);
txn.parameterized txn.parameterized
("update Builds set finished = 1, buildStatus = $2, startTime = $3, stopTime = $3, errorMsg = $4 where id = $1") ("update Builds set finished = 1, buildStatus = $2, startTime = $3, stopTime = $3, errorMsg = $4 where id = $1")
(build->id) (build->id)
@ -360,7 +366,7 @@ void State::getQueuedBuilds(std::shared_ptr<StoreAPI> store, pqxx::connection &
Derivation drv = readDerivation(build->drvPath); Derivation drv = readDerivation(build->drvPath);
BuildResult res = getBuildResult(store, drv); BuildResult res = getBuildResult(store, drv);
pqxx::work txn(conn); pqxx::work txn(*conn);
time_t now = time(0); time_t now = time(0);
markSucceededBuild(txn, build, res, true, now, now); markSucceededBuild(txn, build, res, true, now, now);
txn.commit(); txn.commit();
@ -618,11 +624,11 @@ void State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step)
/* Create a build step record indicating that we started /* Create a build step record indicating that we started
building. */ building. */
Connection conn; auto conn(dbPool.get());
time_t startTime = time(0); time_t startTime = time(0);
int stepNr; int stepNr;
{ {
pqxx::work txn(conn); pqxx::work txn(*conn);
stepNr = createBuildStep(txn, startTime, build, step, bssBusy); stepNr = createBuildStep(txn, startTime, build, step, bssBusy);
txn.commit(); txn.commit();
} }
@ -668,7 +674,7 @@ void State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step)
/* Update the database. */ /* Update the database. */
{ {
pqxx::work txn(conn); pqxx::work txn(*conn);
if (success) { if (success) {
@ -757,10 +763,7 @@ void State::markSucceededBuild(pqxx::work & txn, Build::ptr build,
void State::run() void State::run()
{ {
{ markActiveBuildStepsAsAborted(0);
Connection conn;
markActiveBuildStepsAsAborted(conn, 0);
}
queueMonitorThread = std::thread(&State::queueMonitor, this); queueMonitorThread = std::thread(&State::queueMonitor, this);
@ -785,6 +788,8 @@ void State::run()
{ auto runnable_(runnable.lock()); } // barrier { auto runnable_(runnable.lock()); } // barrier
runnableWakeup.notify_all(); runnableWakeup.notify_all();
for (auto & thread : builderThreads) thread.join(); for (auto & thread : builderThreads) thread.join();
printMsg(lvlError, format("psql connections = %1%") % dbPool.count());
} }

View file

@ -0,0 +1,85 @@
#pragma once
#include <memory>
#include <list>
#include "sync.hh"
/* This template class implements a simple pool manager of resources
of some type R, such as database connections. It is used as
follows:
class Connection { ... };
Pool<Connection> pool;
{
auto conn(pool.get());
conn->exec("select ...");
}
Here, the Connection object referenced by conn is automatically
returned to the pool when conn goes out of scope.
*/
template <class R>
class Pool
{
private:
struct State
{
unsigned int count = 0;
std::list<std::shared_ptr<R>> idle;
};
Sync<State> state;
public:
class Handle
{
private:
Pool & pool;
std::shared_ptr<R> r;
friend Pool;
Handle(Pool & pool, std::shared_ptr<R> r) : pool(pool), r(r) { }
public:
Handle(Handle && h) : pool(h.pool), r(h.r) { h.r.reset(); }
Handle(const Handle & l) = delete;
~Handle()
{
auto state_(pool.state.lock());
if (r) state_->idle.push_back(r);
}
R * operator -> () { return r; }
R & operator * () { return *r; }
};
Handle get()
{
{
auto state_(state.lock());
if (!state_->idle.empty()) {
auto p = state_->idle.back();
state_->idle.pop_back();
return Handle(*this, p);
}
state_->count++;
}
/* Note: we don't hold the lock while creating a new instance,
because creation might take a long time. */
return Handle(*this, std::make_shared<R>());
}
unsigned int count()
{
auto state_(state.lock());
return state_->count;
}
};