Remove bad daemon connections from the pool

This is necessary for long-running processes like hydra-queue-runner:
if a nix-daemon worker is killed, we need to stop reusing that
connection.
This commit is contained in:
Eelco Dolstra 2016-02-24 11:39:56 +01:00
parent d5626bf4c1
commit 5f862658c3
4 changed files with 55 additions and 14 deletions

View file

@ -40,7 +40,11 @@ template PathSet readStorePaths(Source & from);
RemoteStore::RemoteStore(size_t maxConnections) RemoteStore::RemoteStore(size_t maxConnections)
: connections(make_ref<Pool<Connection>>(maxConnections, [this]() { return openConnection(); })) : connections(make_ref<Pool<Connection>>(
maxConnections,
[this]() { return openConnection(); },
[](const ref<Connection> & r) { return r->to.good() && r->from.good(); }
))
{ {
} }

View file

@ -33,11 +33,17 @@ class Pool
{ {
public: public:
/* A function that produces new instances of R on demand. */
typedef std::function<ref<R>()> Factory; typedef std::function<ref<R>()> Factory;
/* A function that checks whether an instance of R is still
usable. Unusable instances are removed from the pool. */
typedef std::function<bool(const ref<R> &)> Validator;
private: private:
Factory factory; Factory factory;
Validator validator;
struct State struct State
{ {
@ -53,8 +59,10 @@ private:
public: public:
Pool(size_t max = std::numeric_limits<size_t>::max, Pool(size_t max = std::numeric_limits<size_t>::max,
const Factory & factory = []() { return make_ref<R>(); }) const Factory & factory = []() { return make_ref<R>(); },
const Validator & validator = [](ref<R> r) { return true; })
: factory(factory) : factory(factory)
, validator(validator)
{ {
auto state_(state.lock()); auto state_(state.lock());
state_->max = max; state_->max = max;
@ -109,12 +117,14 @@ public:
while (state_->idle.empty() && state_->inUse >= state_->max) while (state_->idle.empty() && state_->inUse >= state_->max)
state_.wait(wakeup); state_.wait(wakeup);
if (!state_->idle.empty()) { while (!state_->idle.empty()) {
auto p = state_->idle.back(); auto p = state_->idle.back();
state_->idle.pop_back(); state_->idle.pop_back();
if (validator(p)) {
state_->inUse++; state_->inUse++;
return Handle(*this, p); return Handle(*this, p);
} }
}
state_->inUse++; state_->inUse++;
} }

View file

@ -72,7 +72,17 @@ void FdSink::write(const unsigned char * data, size_t len)
warned = true; warned = true;
} }
} }
try {
writeFull(fd, data, len); writeFull(fd, data, len);
} catch (SysError & e) {
_good = true;
}
}
bool FdSink::good()
{
return _good;
} }
@ -119,12 +129,18 @@ size_t FdSource::readUnbuffered(unsigned char * data, size_t len)
checkInterrupt(); checkInterrupt();
n = ::read(fd, (char *) data, bufSize); n = ::read(fd, (char *) data, bufSize);
} while (n == -1 && errno == EINTR); } while (n == -1 && errno == EINTR);
if (n == -1) throw SysError("reading from file"); if (n == -1) { _good = false; throw SysError("reading from file"); }
if (n == 0) throw EndOfFile("unexpected end-of-file"); if (n == 0) { _good = false; throw EndOfFile("unexpected end-of-file"); }
return n; return n;
} }
bool FdSource::good()
{
return _good;
}
size_t StringSource::read(unsigned char * data, size_t len) size_t StringSource::read(unsigned char * data, size_t len)
{ {
if (pos == s.size()) throw EndOfFile("end of string reached"); if (pos == s.size()) throw EndOfFile("end of string reached");

View file

@ -12,6 +12,7 @@ struct Sink
{ {
virtual ~Sink() { } virtual ~Sink() { }
virtual void operator () (const unsigned char * data, size_t len) = 0; virtual void operator () (const unsigned char * data, size_t len) = 0;
virtual bool good() { return true; }
}; };
@ -25,7 +26,7 @@ struct BufferedSink : Sink
: bufSize(bufSize), bufPos(0), buffer(0) { } : bufSize(bufSize), bufPos(0), buffer(0) { }
~BufferedSink(); ~BufferedSink();
void operator () (const unsigned char * data, size_t len); void operator () (const unsigned char * data, size_t len) override;
void flush(); void flush();
@ -47,6 +48,8 @@ struct Source
return the number of bytes stored. If blocks until at least return the number of bytes stored. If blocks until at least
one byte is available. */ one byte is available. */
virtual size_t read(unsigned char * data, size_t len) = 0; virtual size_t read(unsigned char * data, size_t len) = 0;
virtual bool good() { return true; }
}; };
@ -60,7 +63,7 @@ struct BufferedSource : Source
: bufSize(bufSize), bufPosIn(0), bufPosOut(0), buffer(0) { } : bufSize(bufSize), bufPosIn(0), bufPosOut(0), buffer(0) { }
~BufferedSource(); ~BufferedSource();
size_t read(unsigned char * data, size_t len); size_t read(unsigned char * data, size_t len) override;
/* Underlying read call, to be overridden. */ /* Underlying read call, to be overridden. */
virtual size_t readUnbuffered(unsigned char * data, size_t len) = 0; virtual size_t readUnbuffered(unsigned char * data, size_t len) = 0;
@ -80,7 +83,12 @@ struct FdSink : BufferedSink
FdSink(int fd) : fd(fd), warn(false), written(0) { } FdSink(int fd) : fd(fd), warn(false), written(0) { }
~FdSink(); ~FdSink();
void write(const unsigned char * data, size_t len); void write(const unsigned char * data, size_t len) override;
bool good() override;
private:
bool _good = true;
}; };
@ -90,7 +98,10 @@ struct FdSource : BufferedSource
int fd; int fd;
FdSource() : fd(-1) { } FdSource() : fd(-1) { }
FdSource(int fd) : fd(fd) { } FdSource(int fd) : fd(fd) { }
size_t readUnbuffered(unsigned char * data, size_t len); size_t readUnbuffered(unsigned char * data, size_t len) override;
bool good() override;
private:
bool _good = true;
}; };
@ -98,7 +109,7 @@ struct FdSource : BufferedSource
struct StringSink : Sink struct StringSink : Sink
{ {
string s; string s;
void operator () (const unsigned char * data, size_t len); void operator () (const unsigned char * data, size_t len) override;
}; };
@ -108,7 +119,7 @@ struct StringSource : Source
const string & s; const string & s;
size_t pos; size_t pos;
StringSource(const string & _s) : s(_s), pos(0) { } StringSource(const string & _s) : s(_s), pos(0) { }
size_t read(unsigned char * data, size_t len); size_t read(unsigned char * data, size_t len) override;
}; };