RemoteStore: Make thread-safe

This allows a RemoteStore object to be used safely from multiple
threads concurrently. It will make multiple daemon connections if
necessary.

Note: pool.hh and sync.hh have been copied from the Hydra source tree.
This commit is contained in:
Eelco Dolstra 2016-02-23 15:00:59 +01:00
parent c0b7a8a0b5
commit e292144d46
4 changed files with 414 additions and 235 deletions

View file

@ -6,6 +6,7 @@
#include "affinity.hh" #include "affinity.hh"
#include "globals.hh" #include "globals.hh"
#include "derivations.hh" #include "derivations.hh"
#include "pool.hh"
#include <sys/types.h> #include <sys/types.h>
#include <sys/stat.h> #include <sys/stat.h>
@ -14,7 +15,6 @@
#include <errno.h> #include <errno.h>
#include <fcntl.h> #include <fcntl.h>
#include <iostream>
#include <unistd.h> #include <unistd.h>
#include <cstring> #include <cstring>
@ -40,61 +40,20 @@ template PathSet readStorePaths(Source & from);
RemoteStore::RemoteStore() RemoteStore::RemoteStore()
: connections(make_ref<Pool<Connection>>([this]() { return openConnection(); }))
{ {
initialised = false;
} }
void RemoteStore::openConnection(bool reserveSpace) ref<RemoteStore::Connection> RemoteStore::openConnection(bool reserveSpace)
{ {
if (initialised) return; auto conn = make_ref<Connection>();
initialised = true;
/* Connect to a daemon that does the privileged work for us. */ /* Connect to a daemon that does the privileged work for us. */
connectToDaemon(); conn->fd = socket(PF_UNIX, SOCK_STREAM, 0);
if (conn->fd == -1)
from.fd = fdSocket;
to.fd = fdSocket;
/* Send the magic greeting, check for the reply. */
try {
to << WORKER_MAGIC_1;
to.flush();
unsigned int magic = readInt(from);
if (magic != WORKER_MAGIC_2) throw Error("protocol mismatch");
daemonVersion = readInt(from);
if (GET_PROTOCOL_MAJOR(daemonVersion) != GET_PROTOCOL_MAJOR(PROTOCOL_VERSION))
throw Error("Nix daemon protocol version not supported");
to << PROTOCOL_VERSION;
if (GET_PROTOCOL_MINOR(daemonVersion) >= 14) {
int cpu = settings.lockCPU ? lockToCurrentCPU() : -1;
if (cpu != -1)
to << 1 << cpu;
else
to << 0;
}
if (GET_PROTOCOL_MINOR(daemonVersion) >= 11)
to << reserveSpace;
processStderr();
}
catch (Error & e) {
throw Error(format("cannot start daemon worker: %1%") % e.msg());
}
setOptions();
}
void RemoteStore::connectToDaemon()
{
fdSocket = socket(PF_UNIX, SOCK_STREAM, 0);
if (fdSocket == -1)
throw SysError("cannot create Unix domain socket"); throw SysError("cannot create Unix domain socket");
closeOnExec(fdSocket); closeOnExec(conn->fd);
string socketPath = settings.nixDaemonSocketFile; string socketPath = settings.nixDaemonSocketFile;
@ -111,111 +70,147 @@ void RemoteStore::connectToDaemon()
addr.sun_family = AF_UNIX; addr.sun_family = AF_UNIX;
if (socketPathRel.size() >= sizeof(addr.sun_path)) if (socketPathRel.size() >= sizeof(addr.sun_path))
throw Error(format("socket path %1% is too long") % socketPathRel); throw Error(format("socket path %1% is too long") % socketPathRel);
using namespace std;
strcpy(addr.sun_path, socketPathRel.c_str()); strcpy(addr.sun_path, socketPathRel.c_str());
if (connect(fdSocket, (struct sockaddr *) &addr, sizeof(addr)) == -1) if (connect(conn->fd, (struct sockaddr *) &addr, sizeof(addr)) == -1)
throw SysError(format("cannot connect to daemon at %1%") % socketPath); throw SysError(format("cannot connect to daemon at %1%") % socketPath);
if (fchdir(fdPrevDir) == -1) if (fchdir(fdPrevDir) == -1)
throw SysError("couldn't change back to previous directory"); throw SysError("couldn't change back to previous directory");
conn->from.fd = conn->fd;
conn->to.fd = conn->fd;
/* Send the magic greeting, check for the reply. */
try {
conn->to << WORKER_MAGIC_1;
conn->to.flush();
unsigned int magic = readInt(conn->from);
if (magic != WORKER_MAGIC_2) throw Error("protocol mismatch");
conn->daemonVersion = readInt(conn->from);
if (GET_PROTOCOL_MAJOR(conn->daemonVersion) != GET_PROTOCOL_MAJOR(PROTOCOL_VERSION))
throw Error("Nix daemon protocol version not supported");
conn->to << PROTOCOL_VERSION;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 14) {
int cpu = settings.lockCPU ? lockToCurrentCPU() : -1;
if (cpu != -1)
conn->to << 1 << cpu;
else
conn->to << 0;
}
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 11)
conn->to << reserveSpace;
conn->processStderr();
}
catch (Error & e) {
throw Error(format("cannot start daemon worker: %1%") % e.msg());
}
setOptions(conn);
return conn;
} }
RemoteStore::~RemoteStore() RemoteStore::~RemoteStore()
{ {
try { try {
to.flush(); //to.flush();
fdSocket.close(); //fdSocket.close();
// FIXME: close pool
} catch (...) { } catch (...) {
ignoreException(); ignoreException();
} }
} }
void RemoteStore::setOptions() void RemoteStore::setOptions(ref<Connection> conn)
{ {
to << wopSetOptions conn->to << wopSetOptions
<< settings.keepFailed << settings.keepFailed
<< settings.keepGoing << settings.keepGoing
<< settings.tryFallback << settings.tryFallback
<< verbosity << verbosity
<< settings.maxBuildJobs << settings.maxBuildJobs
<< settings.maxSilentTime; << settings.maxSilentTime;
if (GET_PROTOCOL_MINOR(daemonVersion) >= 2) if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 2)
to << settings.useBuildHook; conn->to << settings.useBuildHook;
if (GET_PROTOCOL_MINOR(daemonVersion) >= 4) if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 4)
to << settings.buildVerbosity conn->to << settings.buildVerbosity
<< logType << logType
<< settings.printBuildTrace; << settings.printBuildTrace;
if (GET_PROTOCOL_MINOR(daemonVersion) >= 6) if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 6)
to << settings.buildCores; conn->to << settings.buildCores;
if (GET_PROTOCOL_MINOR(daemonVersion) >= 10) if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 10)
to << settings.useSubstitutes; conn->to << settings.useSubstitutes;
if (GET_PROTOCOL_MINOR(daemonVersion) >= 12) { if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 12) {
Settings::SettingsMap overrides = settings.getOverrides(); Settings::SettingsMap overrides = settings.getOverrides();
if (overrides["ssh-auth-sock"] == "") if (overrides["ssh-auth-sock"] == "")
overrides["ssh-auth-sock"] = getEnv("SSH_AUTH_SOCK"); overrides["ssh-auth-sock"] = getEnv("SSH_AUTH_SOCK");
to << overrides.size(); conn->to << overrides.size();
for (auto & i : overrides) for (auto & i : overrides)
to << i.first << i.second; conn->to << i.first << i.second;
} }
processStderr(); conn->processStderr();
} }
bool RemoteStore::isValidPath(const Path & path) bool RemoteStore::isValidPath(const Path & path)
{ {
openConnection(); auto conn(connections->get());
to << wopIsValidPath << path; conn->to << wopIsValidPath << path;
processStderr(); conn->processStderr();
unsigned int reply = readInt(from); unsigned int reply = readInt(conn->from);
return reply != 0; return reply != 0;
} }
PathSet RemoteStore::queryValidPaths(const PathSet & paths) PathSet RemoteStore::queryValidPaths(const PathSet & paths)
{ {
openConnection(); auto conn(connections->get());
if (GET_PROTOCOL_MINOR(daemonVersion) < 12) { if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) {
PathSet res; PathSet res;
for (auto & i : paths) for (auto & i : paths)
if (isValidPath(i)) res.insert(i); if (isValidPath(i)) res.insert(i);
return res; return res;
} else { } else {
to << wopQueryValidPaths << paths; conn->to << wopQueryValidPaths << paths;
processStderr(); conn->processStderr();
return readStorePaths<PathSet>(from); return readStorePaths<PathSet>(conn->from);
} }
} }
PathSet RemoteStore::queryAllValidPaths() PathSet RemoteStore::queryAllValidPaths()
{ {
openConnection(); auto conn(connections->get());
to << wopQueryAllValidPaths; conn->to << wopQueryAllValidPaths;
processStderr(); conn->processStderr();
return readStorePaths<PathSet>(from); return readStorePaths<PathSet>(conn->from);
} }
PathSet RemoteStore::querySubstitutablePaths(const PathSet & paths) PathSet RemoteStore::querySubstitutablePaths(const PathSet & paths)
{ {
openConnection(); auto conn(connections->get());
if (GET_PROTOCOL_MINOR(daemonVersion) < 12) { if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) {
PathSet res; PathSet res;
for (auto & i : paths) { for (auto & i : paths) {
to << wopHasSubstitutes << i; conn->to << wopHasSubstitutes << i;
processStderr(); conn->processStderr();
if (readInt(from)) res.insert(i); if (readInt(conn->from)) res.insert(i);
} }
return res; return res;
} else { } else {
to << wopQuerySubstitutablePaths << paths; conn->to << wopQuerySubstitutablePaths << paths;
processStderr(); conn->processStderr();
return readStorePaths<PathSet>(from); return readStorePaths<PathSet>(conn->from);
} }
} }
@ -225,39 +220,39 @@ void RemoteStore::querySubstitutablePathInfos(const PathSet & paths,
{ {
if (paths.empty()) return; if (paths.empty()) return;
openConnection(); auto conn(connections->get());
if (GET_PROTOCOL_MINOR(daemonVersion) < 3) return; if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 3) return;
if (GET_PROTOCOL_MINOR(daemonVersion) < 12) { if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) {
for (auto & i : paths) { for (auto & i : paths) {
SubstitutablePathInfo info; SubstitutablePathInfo info;
to << wopQuerySubstitutablePathInfo << i; conn->to << wopQuerySubstitutablePathInfo << i;
processStderr(); conn->processStderr();
unsigned int reply = readInt(from); unsigned int reply = readInt(conn->from);
if (reply == 0) continue; if (reply == 0) continue;
info.deriver = readString(from); info.deriver = readString(conn->from);
if (info.deriver != "") assertStorePath(info.deriver); if (info.deriver != "") assertStorePath(info.deriver);
info.references = readStorePaths<PathSet>(from); info.references = readStorePaths<PathSet>(conn->from);
info.downloadSize = readLongLong(from); info.downloadSize = readLongLong(conn->from);
info.narSize = GET_PROTOCOL_MINOR(daemonVersion) >= 7 ? readLongLong(from) : 0; info.narSize = GET_PROTOCOL_MINOR(conn->daemonVersion) >= 7 ? readLongLong(conn->from) : 0;
infos[i] = info; infos[i] = info;
} }
} else { } else {
to << wopQuerySubstitutablePathInfos << paths; conn->to << wopQuerySubstitutablePathInfos << paths;
processStderr(); conn->processStderr();
unsigned int count = readInt(from); unsigned int count = readInt(conn->from);
for (unsigned int n = 0; n < count; n++) { for (unsigned int n = 0; n < count; n++) {
Path path = readStorePath(from); Path path = readStorePath(conn->from);
SubstitutablePathInfo & info(infos[path]); SubstitutablePathInfo & info(infos[path]);
info.deriver = readString(from); info.deriver = readString(conn->from);
if (info.deriver != "") assertStorePath(info.deriver); if (info.deriver != "") assertStorePath(info.deriver);
info.references = readStorePaths<PathSet>(from); info.references = readStorePaths<PathSet>(conn->from);
info.downloadSize = readLongLong(from); info.downloadSize = readLongLong(conn->from);
info.narSize = readLongLong(from); info.narSize = readLongLong(conn->from);
} }
} }
@ -266,27 +261,27 @@ void RemoteStore::querySubstitutablePathInfos(const PathSet & paths,
ValidPathInfo RemoteStore::queryPathInfo(const Path & path) ValidPathInfo RemoteStore::queryPathInfo(const Path & path)
{ {
openConnection(); auto conn(connections->get());
to << wopQueryPathInfo << path; conn->to << wopQueryPathInfo << path;
processStderr(); conn->processStderr();
ValidPathInfo info; ValidPathInfo info;
info.path = path; info.path = path;
info.deriver = readString(from); info.deriver = readString(conn->from);
if (info.deriver != "") assertStorePath(info.deriver); if (info.deriver != "") assertStorePath(info.deriver);
info.narHash = parseHash(htSHA256, readString(from)); info.narHash = parseHash(htSHA256, readString(conn->from));
info.references = readStorePaths<PathSet>(from); info.references = readStorePaths<PathSet>(conn->from);
info.registrationTime = readInt(from); info.registrationTime = readInt(conn->from);
info.narSize = readLongLong(from); info.narSize = readLongLong(conn->from);
return info; return info;
} }
Hash RemoteStore::queryPathHash(const Path & path) Hash RemoteStore::queryPathHash(const Path & path)
{ {
openConnection(); auto conn(connections->get());
to << wopQueryPathHash << path; conn->to << wopQueryPathHash << path;
processStderr(); conn->processStderr();
string hash = readString(from); string hash = readString(conn->from);
return parseHash(htSHA256, hash); return parseHash(htSHA256, hash);
} }
@ -294,10 +289,10 @@ Hash RemoteStore::queryPathHash(const Path & path)
void RemoteStore::queryReferences(const Path & path, void RemoteStore::queryReferences(const Path & path,
PathSet & references) PathSet & references)
{ {
openConnection(); auto conn(connections->get());
to << wopQueryReferences << path; conn->to << wopQueryReferences << path;
processStderr(); conn->processStderr();
PathSet references2 = readStorePaths<PathSet>(from); PathSet references2 = readStorePaths<PathSet>(conn->from);
references.insert(references2.begin(), references2.end()); references.insert(references2.begin(), references2.end());
} }
@ -305,20 +300,20 @@ void RemoteStore::queryReferences(const Path & path,
void RemoteStore::queryReferrers(const Path & path, void RemoteStore::queryReferrers(const Path & path,
PathSet & referrers) PathSet & referrers)
{ {
openConnection(); auto conn(connections->get());
to << wopQueryReferrers << path; conn->to << wopQueryReferrers << path;
processStderr(); conn->processStderr();
PathSet referrers2 = readStorePaths<PathSet>(from); PathSet referrers2 = readStorePaths<PathSet>(conn->from);
referrers.insert(referrers2.begin(), referrers2.end()); referrers.insert(referrers2.begin(), referrers2.end());
} }
Path RemoteStore::queryDeriver(const Path & path) Path RemoteStore::queryDeriver(const Path & path)
{ {
openConnection(); auto conn(connections->get());
to << wopQueryDeriver << path; conn->to << wopQueryDeriver << path;
processStderr(); conn->processStderr();
Path drvPath = readString(from); Path drvPath = readString(conn->from);
if (drvPath != "") assertStorePath(drvPath); if (drvPath != "") assertStorePath(drvPath);
return drvPath; return drvPath;
} }
@ -326,37 +321,37 @@ Path RemoteStore::queryDeriver(const Path & path)
PathSet RemoteStore::queryValidDerivers(const Path & path) PathSet RemoteStore::queryValidDerivers(const Path & path)
{ {
openConnection(); auto conn(connections->get());
to << wopQueryValidDerivers << path; conn->to << wopQueryValidDerivers << path;
processStderr(); conn->processStderr();
return readStorePaths<PathSet>(from); return readStorePaths<PathSet>(conn->from);
} }
PathSet RemoteStore::queryDerivationOutputs(const Path & path) PathSet RemoteStore::queryDerivationOutputs(const Path & path)
{ {
openConnection(); auto conn(connections->get());
to << wopQueryDerivationOutputs << path; conn->to << wopQueryDerivationOutputs << path;
processStderr(); conn->processStderr();
return readStorePaths<PathSet>(from); return readStorePaths<PathSet>(conn->from);
} }
PathSet RemoteStore::queryDerivationOutputNames(const Path & path) PathSet RemoteStore::queryDerivationOutputNames(const Path & path)
{ {
openConnection(); auto conn(connections->get());
to << wopQueryDerivationOutputNames << path; conn->to << wopQueryDerivationOutputNames << path;
processStderr(); conn->processStderr();
return readStrings<PathSet>(from); return readStrings<PathSet>(conn->from);
} }
Path RemoteStore::queryPathFromHashPart(const string & hashPart) Path RemoteStore::queryPathFromHashPart(const string & hashPart)
{ {
openConnection(); auto conn(connections->get());
to << wopQueryPathFromHashPart << hashPart; conn->to << wopQueryPathFromHashPart << hashPart;
processStderr(); conn->processStderr();
Path path = readString(from); Path path = readString(conn->from);
if (!path.empty()) assertStorePath(path); if (!path.empty()) assertStorePath(path);
return path; return path;
} }
@ -367,32 +362,32 @@ Path RemoteStore::addToStore(const string & name, const Path & _srcPath,
{ {
if (repair) throw Error("repairing is not supported when building through the Nix daemon"); if (repair) throw Error("repairing is not supported when building through the Nix daemon");
openConnection(); auto conn(connections->get());
Path srcPath(absPath(_srcPath)); Path srcPath(absPath(_srcPath));
to << wopAddToStore << name conn->to << wopAddToStore << name
<< ((hashAlgo == htSHA256 && recursive) ? 0 : 1) /* backwards compatibility hack */ << ((hashAlgo == htSHA256 && recursive) ? 0 : 1) /* backwards compatibility hack */
<< (recursive ? 1 : 0) << (recursive ? 1 : 0)
<< printHashType(hashAlgo); << printHashType(hashAlgo);
try { try {
to.written = 0; conn->to.written = 0;
to.warn = true; conn->to.warn = true;
dumpPath(srcPath, to, filter); dumpPath(srcPath, conn->to, filter);
to.warn = false; conn->to.warn = false;
processStderr(); conn->processStderr();
} catch (SysError & e) { } catch (SysError & e) {
/* Daemon closed while we were sending the path. Probably OOM /* Daemon closed while we were sending the path. Probably OOM
or I/O error. */ or I/O error. */
if (e.errNo == EPIPE) if (e.errNo == EPIPE)
try { try {
processStderr(); conn->processStderr();
} catch (EndOfFile & e) { } } catch (EndOfFile & e) { }
throw; throw;
} }
return readStorePath(from); return readStorePath(conn->from);
} }
@ -401,43 +396,43 @@ Path RemoteStore::addTextToStore(const string & name, const string & s,
{ {
if (repair) throw Error("repairing is not supported when building through the Nix daemon"); if (repair) throw Error("repairing is not supported when building through the Nix daemon");
openConnection(); auto conn(connections->get());
to << wopAddTextToStore << name << s << references; conn->to << wopAddTextToStore << name << s << references;
processStderr(); conn->processStderr();
return readStorePath(from); return readStorePath(conn->from);
} }
void RemoteStore::exportPath(const Path & path, bool sign, void RemoteStore::exportPath(const Path & path, bool sign,
Sink & sink) Sink & sink)
{ {
openConnection(); auto conn(connections->get());
to << wopExportPath << path << (sign ? 1 : 0); conn->to << wopExportPath << path << (sign ? 1 : 0);
processStderr(&sink); /* sink receives the actual data */ conn->processStderr(&sink); /* sink receives the actual data */
readInt(from); readInt(conn->from);
} }
Paths RemoteStore::importPaths(bool requireSignature, Source & source) Paths RemoteStore::importPaths(bool requireSignature, Source & source)
{ {
openConnection(); auto conn(connections->get());
to << wopImportPaths; conn->to << wopImportPaths;
/* We ignore requireSignature, since the worker forces it to true /* We ignore requireSignature, since the worker forces it to true
anyway. */ anyway. */
processStderr(0, &source); conn->processStderr(0, &source);
return readStorePaths<Paths>(from); return readStorePaths<Paths>(conn->from);
} }
void RemoteStore::buildPaths(const PathSet & drvPaths, BuildMode buildMode) void RemoteStore::buildPaths(const PathSet & drvPaths, BuildMode buildMode)
{ {
openConnection(); auto conn(connections->get());
to << wopBuildPaths; conn->to << wopBuildPaths;
if (GET_PROTOCOL_MINOR(daemonVersion) >= 13) { if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 13) {
to << drvPaths; conn->to << drvPaths;
if (GET_PROTOCOL_MINOR(daemonVersion) >= 15) if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 15)
to << buildMode; conn->to << buildMode;
else else
/* Old daemons did not take a 'buildMode' parameter, so we /* Old daemons did not take a 'buildMode' parameter, so we
need to validate it here on the client side. */ need to validate it here on the client side. */
@ -449,22 +444,22 @@ void RemoteStore::buildPaths(const PathSet & drvPaths, BuildMode buildMode)
PathSet drvPaths2; PathSet drvPaths2;
for (auto & i : drvPaths) for (auto & i : drvPaths)
drvPaths2.insert(string(i, 0, i.find('!'))); drvPaths2.insert(string(i, 0, i.find('!')));
to << drvPaths2; conn->to << drvPaths2;
} }
processStderr(); conn->processStderr();
readInt(from); readInt(conn->from);
} }
BuildResult RemoteStore::buildDerivation(const Path & drvPath, const BasicDerivation & drv, BuildResult RemoteStore::buildDerivation(const Path & drvPath, const BasicDerivation & drv,
BuildMode buildMode) BuildMode buildMode)
{ {
openConnection(); auto conn(connections->get());
to << wopBuildDerivation << drvPath << drv << buildMode; conn->to << wopBuildDerivation << drvPath << drv << buildMode;
processStderr(); conn->processStderr();
BuildResult res; BuildResult res;
unsigned int status; unsigned int status;
from >> status >> res.errorMsg; conn->from >> status >> res.errorMsg;
res.status = (BuildResult::Status) status; res.status = (BuildResult::Status) status;
return res; return res;
} }
@ -472,50 +467,50 @@ BuildResult RemoteStore::buildDerivation(const Path & drvPath, const BasicDeriva
void RemoteStore::ensurePath(const Path & path) void RemoteStore::ensurePath(const Path & path)
{ {
openConnection(); auto conn(connections->get());
to << wopEnsurePath << path; conn->to << wopEnsurePath << path;
processStderr(); conn->processStderr();
readInt(from); readInt(conn->from);
} }
void RemoteStore::addTempRoot(const Path & path) void RemoteStore::addTempRoot(const Path & path)
{ {
openConnection(); auto conn(connections->get());
to << wopAddTempRoot << path; conn->to << wopAddTempRoot << path;
processStderr(); conn->processStderr();
readInt(from); readInt(conn->from);
} }
void RemoteStore::addIndirectRoot(const Path & path) void RemoteStore::addIndirectRoot(const Path & path)
{ {
openConnection(); auto conn(connections->get());
to << wopAddIndirectRoot << path; conn->to << wopAddIndirectRoot << path;
processStderr(); conn->processStderr();
readInt(from); readInt(conn->from);
} }
void RemoteStore::syncWithGC() void RemoteStore::syncWithGC()
{ {
openConnection(); auto conn(connections->get());
to << wopSyncWithGC; conn->to << wopSyncWithGC;
processStderr(); conn->processStderr();
readInt(from); readInt(conn->from);
} }
Roots RemoteStore::findRoots() Roots RemoteStore::findRoots()
{ {
openConnection(); auto conn(connections->get());
to << wopFindRoots; conn->to << wopFindRoots;
processStderr(); conn->processStderr();
unsigned int count = readInt(from); unsigned int count = readInt(conn->from);
Roots result; Roots result;
while (count--) { while (count--) {
Path link = readString(from); Path link = readString(conn->from);
Path target = readStorePath(from); Path target = readStorePath(conn->from);
result[link] = target; result[link] = target;
} }
return result; return result;
@ -524,56 +519,56 @@ Roots RemoteStore::findRoots()
void RemoteStore::collectGarbage(const GCOptions & options, GCResults & results) void RemoteStore::collectGarbage(const GCOptions & options, GCResults & results)
{ {
openConnection(false); auto conn(connections->get());
to << wopCollectGarbage << options.action << options.pathsToDelete << options.ignoreLiveness conn->to << wopCollectGarbage << options.action << options.pathsToDelete << options.ignoreLiveness
<< options.maxFreed << 0; << options.maxFreed << 0;
if (GET_PROTOCOL_MINOR(daemonVersion) >= 5) if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 5)
/* removed options */ /* removed options */
to << 0 << 0; conn->to << 0 << 0;
processStderr(); conn->processStderr();
results.paths = readStrings<PathSet>(from); results.paths = readStrings<PathSet>(conn->from);
results.bytesFreed = readLongLong(from); results.bytesFreed = readLongLong(conn->from);
readLongLong(from); // obsolete readLongLong(conn->from); // obsolete
} }
PathSet RemoteStore::queryFailedPaths() PathSet RemoteStore::queryFailedPaths()
{ {
openConnection(); auto conn(connections->get());
to << wopQueryFailedPaths; conn->to << wopQueryFailedPaths;
processStderr(); conn->processStderr();
return readStorePaths<PathSet>(from); return readStorePaths<PathSet>(conn->from);
} }
void RemoteStore::clearFailedPaths(const PathSet & paths) void RemoteStore::clearFailedPaths(const PathSet & paths)
{ {
openConnection(); auto conn(connections->get());
to << wopClearFailedPaths << paths; conn->to << wopClearFailedPaths << paths;
processStderr(); conn->processStderr();
readInt(from); readInt(conn->from);
} }
void RemoteStore::optimiseStore() void RemoteStore::optimiseStore()
{ {
openConnection(); auto conn(connections->get());
to << wopOptimiseStore; conn->to << wopOptimiseStore;
processStderr(); conn->processStderr();
readInt(from); readInt(conn->from);
} }
bool RemoteStore::verifyStore(bool checkContents, bool repair) bool RemoteStore::verifyStore(bool checkContents, bool repair)
{ {
openConnection(); auto conn(connections->get());
to << wopVerifyStore << checkContents << repair; conn->to << wopVerifyStore << checkContents << repair;
processStderr(); conn->processStderr();
return readInt(from) != 0; return readInt(conn->from) != 0;
} }
void RemoteStore::processStderr(Sink * sink, Source * source) void RemoteStore::Connection::processStderr(Sink * sink, Source * source)
{ {
to.flush(); to.flush();
unsigned int msg; unsigned int msg;

View file

@ -12,6 +12,7 @@ class Pipe;
class Pid; class Pid;
struct FdSink; struct FdSink;
struct FdSource; struct FdSource;
template<typename T> class Pool;
class RemoteStore : public Store class RemoteStore : public Store
@ -91,19 +92,22 @@ public:
bool verifyStore(bool checkContents, bool repair) override; bool verifyStore(bool checkContents, bool repair) override;
private: private:
AutoCloseFD fdSocket;
FdSink to;
FdSource from;
unsigned int daemonVersion;
bool initialised;
void openConnection(bool reserveSpace = true); struct Connection
{
AutoCloseFD fd;
FdSink to;
FdSource from;
unsigned int daemonVersion;
void processStderr(Sink * sink = 0, Source * source = 0); void processStderr(Sink * sink = 0, Source * source = 0);
};
void connectToDaemon(); ref<Pool<Connection>> connections;
void setOptions(); ref<Connection> openConnection(bool reserveSpace = true);
void setOptions(ref<Connection> conn);
}; };

102
src/libutil/pool.hh Normal file
View file

@ -0,0 +1,102 @@
#pragma once
#include <memory>
#include <list>
#include <functional>
#include "sync.hh"
#include "ref.hh"
namespace nix {
/* This template class implements a simple pool manager of resources
of some type R, such as database connections. It is used as
follows:
class Connection { ... };
Pool<Connection> pool;
{
auto conn(pool.get());
conn->exec("select ...");
}
Here, the Connection object referenced by conn is automatically
returned to the pool when conn goes out of scope.
*/
template <class R>
class Pool
{
public:
typedef std::function<ref<R>()> Factory;
private:
Factory factory;
struct State
{
unsigned int count = 0;
std::list<ref<R>> idle;
};
Sync<State> state;
public:
Pool(const Factory & factory = []() { return make_ref<R>(); })
: factory(factory)
{ }
class Handle
{
private:
Pool & pool;
ref<R> r;
friend Pool;
Handle(Pool & pool, std::shared_ptr<R> r) : pool(pool), r(r) { }
public:
Handle(Handle && h) : pool(h.pool), r(h.r) { abort(); }
Handle(const Handle & l) = delete;
~Handle()
{
auto state_(pool.state.lock());
state_->idle.push_back(r);
}
R * operator -> () { return &*r; }
R & operator * () { return *r; }
};
Handle get()
{
{
auto state_(state.lock());
if (!state_->idle.empty()) {
auto p = state_->idle.back();
state_->idle.pop_back();
return Handle(*this, p);
}
state_->count++;
}
/* Note: we don't hold the lock while creating a new instance,
because creation might take a long time. */
return Handle(*this, factory());
}
unsigned int count()
{
auto state_(state.lock());
return state_->count;
}
};
}

78
src/libutil/sync.hh Normal file
View file

@ -0,0 +1,78 @@
#pragma once
#include <mutex>
#include <condition_variable>
#include <cassert>
namespace nix {
/* This template class ensures synchronized access to a value of type
T. It is used as follows:
struct Data { int x; ... };
Sync<Data> data;
{
auto data_(data.lock());
data_->x = 123;
}
Here, "data" is automatically unlocked when "data_" goes out of
scope.
*/
template<class T>
class Sync
{
private:
std::mutex mutex;
T data;
public:
Sync() { }
Sync(const T & data) : data(data) { }
class Lock
{
private:
Sync * s;
friend Sync;
Lock(Sync * s) : s(s) { s->mutex.lock(); }
public:
Lock(Lock && l) : s(l.s) { l.s = 0; }
Lock(const Lock & l) = delete;
~Lock() { if (s) s->mutex.unlock(); }
T * operator -> () { return &s->data; }
T & operator * () { return s->data; }
/* FIXME: performance impact of condition_variable_any? */
void wait(std::condition_variable_any & cv)
{
assert(s);
cv.wait(s->mutex);
}
template<class Rep, class Period, class Predicate>
bool wait_for(std::condition_variable_any & cv,
const std::chrono::duration<Rep, Period> & duration,
Predicate pred)
{
assert(s);
return cv.wait_for(s->mutex, duration, pred);
}
template<class Clock, class Duration>
std::cv_status wait_until(std::condition_variable_any & cv,
const std::chrono::time_point<Clock, Duration> & duration)
{
assert(s);
return cv.wait_until(s->mutex, duration);
}
};
Lock lock() { return Lock(this); }
};
}