RemoteStore: Close connection if an exception occurs

Fixes #2075.
This commit is contained in:
Eelco Dolstra 2018-10-16 23:36:15 +02:00
parent ba51100d64
commit 79e358ce6d
3 changed files with 121 additions and 63 deletions

View file

@ -161,7 +161,8 @@ void RemoteStore::initConnection(Connection & conn)
if (GET_PROTOCOL_MINOR(conn.daemonVersion) >= 11) if (GET_PROTOCOL_MINOR(conn.daemonVersion) >= 11)
conn.to << false; conn.to << false;
conn.processStderr(); auto ex = conn.processStderr();
if (ex) std::rethrow_exception(ex);
} }
catch (Error & e) { catch (Error & e) {
throw Error("cannot open connection to remote store '%s': %s", getUri(), e.what()); throw Error("cannot open connection to remote store '%s': %s", getUri(), e.what());
@ -195,22 +196,68 @@ void RemoteStore::setOptions(Connection & conn)
conn.to << i.first << i.second.value; conn.to << i.first << i.second.value;
} }
conn.processStderr(); auto ex = conn.processStderr();
if (ex) std::rethrow_exception(ex);
}
/* A wrapper around Pool<RemoteStore::Connection>::Handle that marks
the connection as bad (causing it to be closed) if a non-daemon
exception is thrown before the handle is closed. Such an exception
causes a deviation from the expected protocol and therefore a
desynchronization between the client and daemon. */
struct ConnectionHandle
{
Pool<RemoteStore::Connection>::Handle handle;
bool daemonException = false;
ConnectionHandle(Pool<RemoteStore::Connection>::Handle && handle)
: handle(std::move(handle))
{ }
ConnectionHandle(ConnectionHandle && h)
: handle(std::move(h.handle))
{ }
~ConnectionHandle()
{
if (!daemonException && std::uncaught_exception()) {
handle.markBad();
debug("closing daemon connection because of an exception");
}
}
RemoteStore::Connection * operator -> () { return &*handle; }
void processStderr(Sink * sink = 0, Source * source = 0)
{
auto ex = handle->processStderr(sink, source);
if (ex) {
daemonException = true;
std::rethrow_exception(ex);
}
}
};
ConnectionHandle RemoteStore::getConnection()
{
return ConnectionHandle(connections->get());
} }
bool RemoteStore::isValidPathUncached(const Path & path) bool RemoteStore::isValidPathUncached(const Path & path)
{ {
auto conn(connections->get()); auto conn(getConnection());
conn->to << wopIsValidPath << path; conn->to << wopIsValidPath << path;
conn->processStderr(); conn.processStderr();
return readInt(conn->from); return readInt(conn->from);
} }
PathSet RemoteStore::queryValidPaths(const PathSet & paths, SubstituteFlag maybeSubstitute) PathSet RemoteStore::queryValidPaths(const PathSet & paths, SubstituteFlag maybeSubstitute)
{ {
auto conn(connections->get()); auto conn(getConnection());
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) { if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) {
PathSet res; PathSet res;
for (auto & i : paths) for (auto & i : paths)
@ -218,7 +265,7 @@ PathSet RemoteStore::queryValidPaths(const PathSet & paths, SubstituteFlag maybe
return res; return res;
} else { } else {
conn->to << wopQueryValidPaths << paths; conn->to << wopQueryValidPaths << paths;
conn->processStderr(); conn.processStderr();
return readStorePaths<PathSet>(*this, conn->from); return readStorePaths<PathSet>(*this, conn->from);
} }
} }
@ -226,27 +273,27 @@ PathSet RemoteStore::queryValidPaths(const PathSet & paths, SubstituteFlag maybe
PathSet RemoteStore::queryAllValidPaths() PathSet RemoteStore::queryAllValidPaths()
{ {
auto conn(connections->get()); auto conn(getConnection());
conn->to << wopQueryAllValidPaths; conn->to << wopQueryAllValidPaths;
conn->processStderr(); conn.processStderr();
return readStorePaths<PathSet>(*this, conn->from); return readStorePaths<PathSet>(*this, conn->from);
} }
PathSet RemoteStore::querySubstitutablePaths(const PathSet & paths) PathSet RemoteStore::querySubstitutablePaths(const PathSet & paths)
{ {
auto conn(connections->get()); auto conn(getConnection());
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) { if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) {
PathSet res; PathSet res;
for (auto & i : paths) { for (auto & i : paths) {
conn->to << wopHasSubstitutes << i; conn->to << wopHasSubstitutes << i;
conn->processStderr(); conn.processStderr();
if (readInt(conn->from)) res.insert(i); if (readInt(conn->from)) res.insert(i);
} }
return res; return res;
} else { } else {
conn->to << wopQuerySubstitutablePaths << paths; conn->to << wopQuerySubstitutablePaths << paths;
conn->processStderr(); conn.processStderr();
return readStorePaths<PathSet>(*this, conn->from); return readStorePaths<PathSet>(*this, conn->from);
} }
} }
@ -257,14 +304,14 @@ void RemoteStore::querySubstitutablePathInfos(const PathSet & paths,
{ {
if (paths.empty()) return; if (paths.empty()) return;
auto conn(connections->get()); auto conn(getConnection());
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) { if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) {
for (auto & i : paths) { for (auto & i : paths) {
SubstitutablePathInfo info; SubstitutablePathInfo info;
conn->to << wopQuerySubstitutablePathInfo << i; conn->to << wopQuerySubstitutablePathInfo << i;
conn->processStderr(); conn.processStderr();
unsigned int reply = readInt(conn->from); unsigned int reply = readInt(conn->from);
if (reply == 0) continue; if (reply == 0) continue;
info.deriver = readString(conn->from); info.deriver = readString(conn->from);
@ -278,7 +325,7 @@ void RemoteStore::querySubstitutablePathInfos(const PathSet & paths,
} else { } else {
conn->to << wopQuerySubstitutablePathInfos << paths; conn->to << wopQuerySubstitutablePathInfos << paths;
conn->processStderr(); conn.processStderr();
size_t count = readNum<size_t>(conn->from); size_t count = readNum<size_t>(conn->from);
for (size_t n = 0; n < count; n++) { for (size_t n = 0; n < count; n++) {
Path path = readStorePath(*this, conn->from); Path path = readStorePath(*this, conn->from);
@ -300,10 +347,10 @@ void RemoteStore::queryPathInfoUncached(const Path & path,
try { try {
std::shared_ptr<ValidPathInfo> info; std::shared_ptr<ValidPathInfo> info;
{ {
auto conn(connections->get()); auto conn(getConnection());
conn->to << wopQueryPathInfo << path; conn->to << wopQueryPathInfo << path;
try { try {
conn->processStderr(); conn.processStderr();
} catch (Error & e) { } catch (Error & e) {
// Ugly backwards compatibility hack. // Ugly backwards compatibility hack.
if (e.msg().find("is not valid") != std::string::npos) if (e.msg().find("is not valid") != std::string::npos)
@ -335,9 +382,9 @@ void RemoteStore::queryPathInfoUncached(const Path & path,
void RemoteStore::queryReferrers(const Path & path, void RemoteStore::queryReferrers(const Path & path,
PathSet & referrers) PathSet & referrers)
{ {
auto conn(connections->get()); auto conn(getConnection());
conn->to << wopQueryReferrers << path; conn->to << wopQueryReferrers << path;
conn->processStderr(); conn.processStderr();
PathSet referrers2 = readStorePaths<PathSet>(*this, conn->from); PathSet referrers2 = readStorePaths<PathSet>(*this, conn->from);
referrers.insert(referrers2.begin(), referrers2.end()); referrers.insert(referrers2.begin(), referrers2.end());
} }
@ -345,36 +392,36 @@ void RemoteStore::queryReferrers(const Path & path,
PathSet RemoteStore::queryValidDerivers(const Path & path) PathSet RemoteStore::queryValidDerivers(const Path & path)
{ {
auto conn(connections->get()); auto conn(getConnection());
conn->to << wopQueryValidDerivers << path; conn->to << wopQueryValidDerivers << path;
conn->processStderr(); conn.processStderr();
return readStorePaths<PathSet>(*this, conn->from); return readStorePaths<PathSet>(*this, conn->from);
} }
PathSet RemoteStore::queryDerivationOutputs(const Path & path) PathSet RemoteStore::queryDerivationOutputs(const Path & path)
{ {
auto conn(connections->get()); auto conn(getConnection());
conn->to << wopQueryDerivationOutputs << path; conn->to << wopQueryDerivationOutputs << path;
conn->processStderr(); conn.processStderr();
return readStorePaths<PathSet>(*this, conn->from); return readStorePaths<PathSet>(*this, conn->from);
} }
PathSet RemoteStore::queryDerivationOutputNames(const Path & path) PathSet RemoteStore::queryDerivationOutputNames(const Path & path)
{ {
auto conn(connections->get()); auto conn(getConnection());
conn->to << wopQueryDerivationOutputNames << path; conn->to << wopQueryDerivationOutputNames << path;
conn->processStderr(); conn.processStderr();
return readStrings<PathSet>(conn->from); return readStrings<PathSet>(conn->from);
} }
Path RemoteStore::queryPathFromHashPart(const string & hashPart) Path RemoteStore::queryPathFromHashPart(const string & hashPart)
{ {
auto conn(connections->get()); auto conn(getConnection());
conn->to << wopQueryPathFromHashPart << hashPart; conn->to << wopQueryPathFromHashPart << hashPart;
conn->processStderr(); conn.processStderr();
Path path = readString(conn->from); Path path = readString(conn->from);
if (!path.empty()) assertStorePath(path); if (!path.empty()) assertStorePath(path);
return path; return path;
@ -384,7 +431,7 @@ Path RemoteStore::queryPathFromHashPart(const string & hashPart)
void RemoteStore::addToStore(const ValidPathInfo & info, Source & source, void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
RepairFlag repair, CheckSigsFlag checkSigs, std::shared_ptr<FSAccessor> accessor) RepairFlag repair, CheckSigsFlag checkSigs, std::shared_ptr<FSAccessor> accessor)
{ {
auto conn(connections->get()); auto conn(getConnection());
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 18) { if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 18) {
conn->to << wopImportPaths; conn->to << wopImportPaths;
@ -403,7 +450,7 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
; ;
}); });
conn->processStderr(0, source2.get()); conn.processStderr(0, source2.get());
auto importedPaths = readStorePaths<PathSet>(*this, conn->from); auto importedPaths = readStorePaths<PathSet>(*this, conn->from);
assert(importedPaths.size() <= 1); assert(importedPaths.size() <= 1);
@ -417,7 +464,7 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
<< repair << !checkSigs; << repair << !checkSigs;
bool tunnel = GET_PROTOCOL_MINOR(conn->daemonVersion) >= 21; bool tunnel = GET_PROTOCOL_MINOR(conn->daemonVersion) >= 21;
if (!tunnel) copyNAR(source, conn->to); if (!tunnel) copyNAR(source, conn->to);
conn->processStderr(0, tunnel ? &source : nullptr); conn.processStderr(0, tunnel ? &source : nullptr);
} }
} }
@ -427,7 +474,7 @@ Path RemoteStore::addToStore(const string & name, const Path & _srcPath,
{ {
if (repair) throw Error("repairing is not supported when building through the Nix daemon"); if (repair) throw Error("repairing is not supported when building through the Nix daemon");
auto conn(connections->get()); auto conn(getConnection());
Path srcPath(absPath(_srcPath)); Path srcPath(absPath(_srcPath));
@ -445,13 +492,13 @@ Path RemoteStore::addToStore(const string & name, const Path & _srcPath,
dumpPath(srcPath, conn->to, filter); dumpPath(srcPath, conn->to, filter);
} }
conn->to.warn = false; conn->to.warn = false;
conn->processStderr(); conn.processStderr();
} catch (SysError & e) { } catch (SysError & e) {
/* Daemon closed while we were sending the path. Probably OOM /* Daemon closed while we were sending the path. Probably OOM
or I/O error. */ or I/O error. */
if (e.errNo == EPIPE) if (e.errNo == EPIPE)
try { try {
conn->processStderr(); conn.processStderr();
} catch (EndOfFile & e) { } } catch (EndOfFile & e) { }
throw; throw;
} }
@ -465,17 +512,17 @@ Path RemoteStore::addTextToStore(const string & name, const string & s,
{ {
if (repair) throw Error("repairing is not supported when building through the Nix daemon"); if (repair) throw Error("repairing is not supported when building through the Nix daemon");
auto conn(connections->get()); auto conn(getConnection());
conn->to << wopAddTextToStore << name << s << references; conn->to << wopAddTextToStore << name << s << references;
conn->processStderr(); conn.processStderr();
return readStorePath(*this, conn->from); return readStorePath(*this, conn->from);
} }
void RemoteStore::buildPaths(const PathSet & drvPaths, BuildMode buildMode) void RemoteStore::buildPaths(const PathSet & drvPaths, BuildMode buildMode)
{ {
auto conn(connections->get()); auto conn(getConnection());
conn->to << wopBuildPaths; conn->to << wopBuildPaths;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 13) { if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 13) {
conn->to << drvPaths; conn->to << drvPaths;
@ -494,7 +541,7 @@ void RemoteStore::buildPaths(const PathSet & drvPaths, BuildMode buildMode)
drvPaths2.insert(string(i, 0, i.find('!'))); drvPaths2.insert(string(i, 0, i.find('!')));
conn->to << drvPaths2; conn->to << drvPaths2;
} }
conn->processStderr(); conn.processStderr();
readInt(conn->from); readInt(conn->from);
} }
@ -502,9 +549,9 @@ void RemoteStore::buildPaths(const PathSet & drvPaths, BuildMode buildMode)
BuildResult RemoteStore::buildDerivation(const Path & drvPath, const BasicDerivation & drv, BuildResult RemoteStore::buildDerivation(const Path & drvPath, const BasicDerivation & drv,
BuildMode buildMode) BuildMode buildMode)
{ {
auto conn(connections->get()); auto conn(getConnection());
conn->to << wopBuildDerivation << drvPath << drv << buildMode; conn->to << wopBuildDerivation << drvPath << drv << buildMode;
conn->processStderr(); conn.processStderr();
BuildResult res; BuildResult res;
unsigned int status; unsigned int status;
conn->from >> status >> res.errorMsg; conn->from >> status >> res.errorMsg;
@ -515,45 +562,45 @@ BuildResult RemoteStore::buildDerivation(const Path & drvPath, const BasicDeriva
void RemoteStore::ensurePath(const Path & path) void RemoteStore::ensurePath(const Path & path)
{ {
auto conn(connections->get()); auto conn(getConnection());
conn->to << wopEnsurePath << path; conn->to << wopEnsurePath << path;
conn->processStderr(); conn.processStderr();
readInt(conn->from); readInt(conn->from);
} }
void RemoteStore::addTempRoot(const Path & path) void RemoteStore::addTempRoot(const Path & path)
{ {
auto conn(connections->get()); auto conn(getConnection());
conn->to << wopAddTempRoot << path; conn->to << wopAddTempRoot << path;
conn->processStderr(); conn.processStderr();
readInt(conn->from); readInt(conn->from);
} }
void RemoteStore::addIndirectRoot(const Path & path) void RemoteStore::addIndirectRoot(const Path & path)
{ {
auto conn(connections->get()); auto conn(getConnection());
conn->to << wopAddIndirectRoot << path; conn->to << wopAddIndirectRoot << path;
conn->processStderr(); conn.processStderr();
readInt(conn->from); readInt(conn->from);
} }
void RemoteStore::syncWithGC() void RemoteStore::syncWithGC()
{ {
auto conn(connections->get()); auto conn(getConnection());
conn->to << wopSyncWithGC; conn->to << wopSyncWithGC;
conn->processStderr(); conn.processStderr();
readInt(conn->from); readInt(conn->from);
} }
Roots RemoteStore::findRoots() Roots RemoteStore::findRoots()
{ {
auto conn(connections->get()); auto conn(getConnection());
conn->to << wopFindRoots; conn->to << wopFindRoots;
conn->processStderr(); conn.processStderr();
size_t count = readNum<size_t>(conn->from); size_t count = readNum<size_t>(conn->from);
Roots result; Roots result;
while (count--) { while (count--) {
@ -567,7 +614,7 @@ Roots RemoteStore::findRoots()
void RemoteStore::collectGarbage(const GCOptions & options, GCResults & results) void RemoteStore::collectGarbage(const GCOptions & options, GCResults & results)
{ {
auto conn(connections->get()); auto conn(getConnection());
conn->to conn->to
<< wopCollectGarbage << options.action << options.pathsToDelete << options.ignoreLiveness << wopCollectGarbage << options.action << options.pathsToDelete << options.ignoreLiveness
@ -575,7 +622,7 @@ void RemoteStore::collectGarbage(const GCOptions & options, GCResults & results)
/* removed options */ /* removed options */
<< 0 << 0 << 0; << 0 << 0 << 0;
conn->processStderr(); conn.processStderr();
results.paths = readStrings<PathSet>(conn->from); results.paths = readStrings<PathSet>(conn->from);
results.bytesFreed = readLongLong(conn->from); results.bytesFreed = readLongLong(conn->from);
@ -590,27 +637,27 @@ void RemoteStore::collectGarbage(const GCOptions & options, GCResults & results)
void RemoteStore::optimiseStore() void RemoteStore::optimiseStore()
{ {
auto conn(connections->get()); auto conn(getConnection());
conn->to << wopOptimiseStore; conn->to << wopOptimiseStore;
conn->processStderr(); conn.processStderr();
readInt(conn->from); readInt(conn->from);
} }
bool RemoteStore::verifyStore(bool checkContents, RepairFlag repair) bool RemoteStore::verifyStore(bool checkContents, RepairFlag repair)
{ {
auto conn(connections->get()); auto conn(getConnection());
conn->to << wopVerifyStore << checkContents << repair; conn->to << wopVerifyStore << checkContents << repair;
conn->processStderr(); conn.processStderr();
return readInt(conn->from); return readInt(conn->from);
} }
void RemoteStore::addSignatures(const Path & storePath, const StringSet & sigs) void RemoteStore::addSignatures(const Path & storePath, const StringSet & sigs)
{ {
auto conn(connections->get()); auto conn(getConnection());
conn->to << wopAddSignatures << storePath << sigs; conn->to << wopAddSignatures << storePath << sigs;
conn->processStderr(); conn.processStderr();
readInt(conn->from); readInt(conn->from);
} }
@ -620,13 +667,13 @@ void RemoteStore::queryMissing(const PathSet & targets,
unsigned long long & downloadSize, unsigned long long & narSize) unsigned long long & downloadSize, unsigned long long & narSize)
{ {
{ {
auto conn(connections->get()); auto conn(getConnection());
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 19) if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 19)
// Don't hold the connection handle in the fallback case // Don't hold the connection handle in the fallback case
// to prevent a deadlock. // to prevent a deadlock.
goto fallback; goto fallback;
conn->to << wopQueryMissing << targets; conn->to << wopQueryMissing << targets;
conn->processStderr(); conn.processStderr();
willBuild = readStorePaths<PathSet>(*this, conn->from); willBuild = readStorePaths<PathSet>(*this, conn->from);
willSubstitute = readStorePaths<PathSet>(*this, conn->from); willSubstitute = readStorePaths<PathSet>(*this, conn->from);
unknown = readStorePaths<PathSet>(*this, conn->from); unknown = readStorePaths<PathSet>(*this, conn->from);
@ -642,7 +689,7 @@ void RemoteStore::queryMissing(const PathSet & targets,
void RemoteStore::connect() void RemoteStore::connect()
{ {
auto conn(connections->get()); auto conn(getConnection());
} }
@ -679,7 +726,7 @@ static Logger::Fields readFields(Source & from)
} }
void RemoteStore::Connection::processStderr(Sink * sink, Source * source) std::exception_ptr RemoteStore::Connection::processStderr(Sink * sink, Source * source)
{ {
to.flush(); to.flush();
@ -704,7 +751,7 @@ void RemoteStore::Connection::processStderr(Sink * sink, Source * source)
else if (msg == STDERR_ERROR) { else if (msg == STDERR_ERROR) {
string error = readString(from); string error = readString(from);
unsigned int status = readInt(from); unsigned int status = readInt(from);
throw Error(status, error); return std::make_exception_ptr(Error(status, error));
} }
else if (msg == STDERR_NEXT) else if (msg == STDERR_NEXT)
@ -738,6 +785,8 @@ void RemoteStore::Connection::processStderr(Sink * sink, Source * source)
else else
throw Error("got unknown message type %x from Nix daemon", msg); throw Error("got unknown message type %x from Nix daemon", msg);
} }
return nullptr;
} }
static std::string uriScheme = "unix://"; static std::string uriScheme = "unix://";

View file

@ -14,6 +14,7 @@ class Pid;
struct FdSink; struct FdSink;
struct FdSource; struct FdSource;
template<typename T> class Pool; template<typename T> class Pool;
struct ConnectionHandle;
/* FIXME: RemoteStore is a misnomer - should be something like /* FIXME: RemoteStore is a misnomer - should be something like
@ -111,7 +112,7 @@ protected:
virtual ~Connection(); virtual ~Connection();
void processStderr(Sink * sink = 0, Source * source = 0); std::exception_ptr processStderr(Sink * sink = 0, Source * source = 0);
}; };
ref<Connection> openConnectionWrapper(); ref<Connection> openConnectionWrapper();
@ -124,6 +125,10 @@ protected:
virtual void setOptions(Connection & conn); virtual void setOptions(Connection & conn);
ConnectionHandle getConnection();
friend class ConnectionHandle;
private: private:
std::atomic_bool failed{false}; std::atomic_bool failed{false};

View file

@ -97,6 +97,7 @@ public:
private: private:
Pool & pool; Pool & pool;
std::shared_ptr<R> r; std::shared_ptr<R> r;
bool bad = false;
friend Pool; friend Pool;
@ -112,7 +113,8 @@ public:
if (!r) return; if (!r) return;
{ {
auto state_(pool.state.lock()); auto state_(pool.state.lock());
state_->idle.push_back(ref<R>(r)); if (!bad)
state_->idle.push_back(ref<R>(r));
assert(state_->inUse); assert(state_->inUse);
state_->inUse--; state_->inUse--;
} }
@ -121,6 +123,8 @@ public:
R * operator -> () { return &*r; } R * operator -> () { return &*r; }
R & operator * () { return *r; } R & operator * () { return *r; }
void markBad() { bad = true; }
}; };
Handle get() Handle get()