lix/src/libstore/remote-store.cc
Eelco Dolstra e0204f8d46 Move path info caching from BinaryCacheStore to Store
Caching path info is generally useful. For instance, it speeds up "nix
path-info -rS /run/current-system" (i.e. showing the closure sizes of
all paths in the closure of the current system) from 5.6s to 0.15s.

This also eliminates some APIs like Store::queryDeriver() and
Store::queryReferences().
2016-04-19 18:52:53 +02:00

569 lines
16 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "serialise.hh"
#include "util.hh"
#include "remote-store.hh"
#include "worker-protocol.hh"
#include "archive.hh"
#include "affinity.hh"
#include "globals.hh"
#include "derivations.hh"
#include "pool.hh"
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <errno.h>
#include <fcntl.h>
#include <unistd.h>
#include <cstring>
namespace nix {
Path readStorePath(Source & from)
{
Path path = readString(from);
assertStorePath(path);
return path;
}
template<class T> T readStorePaths(Source & from)
{
T paths = readStrings<T>(from);
for (auto & i : paths) assertStorePath(i);
return paths;
}
template PathSet readStorePaths(Source & from);
RemoteStore::RemoteStore(size_t maxConnections)
: connections(make_ref<Pool<Connection>>(
maxConnections,
[this]() { return openConnection(); },
[](const ref<Connection> & r) { return r->to.good() && r->from.good(); }
))
{
}
ref<RemoteStore::Connection> RemoteStore::openConnection()
{
auto conn = make_ref<Connection>();
/* Connect to a daemon that does the privileged work for us. */
conn->fd = socket(PF_UNIX, SOCK_STREAM, 0);
if (conn->fd == -1)
throw SysError("cannot create Unix domain socket");
closeOnExec(conn->fd);
string socketPath = settings.nixDaemonSocketFile;
struct sockaddr_un addr;
addr.sun_family = AF_UNIX;
if (socketPath.size() + 1 >= sizeof(addr.sun_path))
throw Error(format("socket path %1% is too long") % socketPath);
strcpy(addr.sun_path, socketPath.c_str());
if (connect(conn->fd, (struct sockaddr *) &addr, sizeof(addr)) == -1)
throw SysError(format("cannot connect to daemon at %1%") % socketPath);
conn->from.fd = conn->fd;
conn->to.fd = conn->fd;
/* Send the magic greeting, check for the reply. */
try {
conn->to << WORKER_MAGIC_1;
conn->to.flush();
unsigned int magic = readInt(conn->from);
if (magic != WORKER_MAGIC_2) throw Error("protocol mismatch");
conn->daemonVersion = readInt(conn->from);
if (GET_PROTOCOL_MAJOR(conn->daemonVersion) != GET_PROTOCOL_MAJOR(PROTOCOL_VERSION))
throw Error("Nix daemon protocol version not supported");
conn->to << PROTOCOL_VERSION;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 14) {
int cpu = settings.lockCPU ? lockToCurrentCPU() : -1;
if (cpu != -1)
conn->to << 1 << cpu;
else
conn->to << 0;
}
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 11)
conn->to << false;
conn->processStderr();
}
catch (Error & e) {
throw Error(format("cannot start daemon worker: %1%") % e.msg());
}
setOptions(conn);
return conn;
}
void RemoteStore::setOptions(ref<Connection> conn)
{
conn->to << wopSetOptions
<< settings.keepFailed
<< settings.keepGoing
<< settings.tryFallback
<< verbosity
<< settings.maxBuildJobs
<< settings.maxSilentTime;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 2)
conn->to << settings.useBuildHook;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 4)
conn->to << settings.buildVerbosity
<< logType
<< settings.printBuildTrace;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 6)
conn->to << settings.buildCores;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 10)
conn->to << settings.useSubstitutes;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 12) {
Settings::SettingsMap overrides = settings.getOverrides();
if (overrides["ssh-auth-sock"] == "")
overrides["ssh-auth-sock"] = getEnv("SSH_AUTH_SOCK");
conn->to << overrides.size();
for (auto & i : overrides)
conn->to << i.first << i.second;
}
conn->processStderr();
}
bool RemoteStore::isValidPathUncached(const Path & path)
{
auto conn(connections->get());
conn->to << wopIsValidPath << path;
conn->processStderr();
unsigned int reply = readInt(conn->from);
return reply != 0;
}
PathSet RemoteStore::queryValidPaths(const PathSet & paths)
{
auto conn(connections->get());
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) {
PathSet res;
for (auto & i : paths)
if (isValidPath(i)) res.insert(i);
return res;
} else {
conn->to << wopQueryValidPaths << paths;
conn->processStderr();
return readStorePaths<PathSet>(conn->from);
}
}
PathSet RemoteStore::queryAllValidPaths()
{
auto conn(connections->get());
conn->to << wopQueryAllValidPaths;
conn->processStderr();
return readStorePaths<PathSet>(conn->from);
}
PathSet RemoteStore::querySubstitutablePaths(const PathSet & paths)
{
auto conn(connections->get());
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) {
PathSet res;
for (auto & i : paths) {
conn->to << wopHasSubstitutes << i;
conn->processStderr();
if (readInt(conn->from)) res.insert(i);
}
return res;
} else {
conn->to << wopQuerySubstitutablePaths << paths;
conn->processStderr();
return readStorePaths<PathSet>(conn->from);
}
}
void RemoteStore::querySubstitutablePathInfos(const PathSet & paths,
SubstitutablePathInfos & infos)
{
if (paths.empty()) return;
auto conn(connections->get());
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 3) return;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) {
for (auto & i : paths) {
SubstitutablePathInfo info;
conn->to << wopQuerySubstitutablePathInfo << i;
conn->processStderr();
unsigned int reply = readInt(conn->from);
if (reply == 0) continue;
info.deriver = readString(conn->from);
if (info.deriver != "") assertStorePath(info.deriver);
info.references = readStorePaths<PathSet>(conn->from);
info.downloadSize = readLongLong(conn->from);
info.narSize = GET_PROTOCOL_MINOR(conn->daemonVersion) >= 7 ? readLongLong(conn->from) : 0;
infos[i] = info;
}
} else {
conn->to << wopQuerySubstitutablePathInfos << paths;
conn->processStderr();
unsigned int count = readInt(conn->from);
for (unsigned int n = 0; n < count; n++) {
Path path = readStorePath(conn->from);
SubstitutablePathInfo & info(infos[path]);
info.deriver = readString(conn->from);
if (info.deriver != "") assertStorePath(info.deriver);
info.references = readStorePaths<PathSet>(conn->from);
info.downloadSize = readLongLong(conn->from);
info.narSize = readLongLong(conn->from);
}
}
}
std::shared_ptr<ValidPathInfo> RemoteStore::queryPathInfoUncached(const Path & path)
{
auto conn(connections->get());
conn->to << wopQueryPathInfo << path;
conn->processStderr();
auto info = std::make_shared<ValidPathInfo>();
info->path = path;
info->deriver = readString(conn->from);
if (info->deriver != "") assertStorePath(info->deriver);
info->narHash = parseHash(htSHA256, readString(conn->from));
info->references = readStorePaths<PathSet>(conn->from);
info->registrationTime = readInt(conn->from);
info->narSize = readLongLong(conn->from);
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 16) {
info->ultimate = readInt(conn->from) != 0;
info->sigs = readStrings<StringSet>(conn->from);
}
return info;
}
void RemoteStore::queryReferrers(const Path & path,
PathSet & referrers)
{
auto conn(connections->get());
conn->to << wopQueryReferrers << path;
conn->processStderr();
PathSet referrers2 = readStorePaths<PathSet>(conn->from);
referrers.insert(referrers2.begin(), referrers2.end());
}
PathSet RemoteStore::queryValidDerivers(const Path & path)
{
auto conn(connections->get());
conn->to << wopQueryValidDerivers << path;
conn->processStderr();
return readStorePaths<PathSet>(conn->from);
}
PathSet RemoteStore::queryDerivationOutputs(const Path & path)
{
auto conn(connections->get());
conn->to << wopQueryDerivationOutputs << path;
conn->processStderr();
return readStorePaths<PathSet>(conn->from);
}
PathSet RemoteStore::queryDerivationOutputNames(const Path & path)
{
auto conn(connections->get());
conn->to << wopQueryDerivationOutputNames << path;
conn->processStderr();
return readStrings<PathSet>(conn->from);
}
Path RemoteStore::queryPathFromHashPart(const string & hashPart)
{
auto conn(connections->get());
conn->to << wopQueryPathFromHashPart << hashPart;
conn->processStderr();
Path path = readString(conn->from);
if (!path.empty()) assertStorePath(path);
return path;
}
Path RemoteStore::addToStore(const string & name, const Path & _srcPath,
bool recursive, HashType hashAlgo, PathFilter & filter, bool repair)
{
if (repair) throw Error("repairing is not supported when building through the Nix daemon");
auto conn(connections->get());
Path srcPath(absPath(_srcPath));
conn->to << wopAddToStore << name
<< ((hashAlgo == htSHA256 && recursive) ? 0 : 1) /* backwards compatibility hack */
<< (recursive ? 1 : 0)
<< printHashType(hashAlgo);
try {
conn->to.written = 0;
conn->to.warn = true;
dumpPath(srcPath, conn->to, filter);
conn->to.warn = false;
conn->processStderr();
} catch (SysError & e) {
/* Daemon closed while we were sending the path. Probably OOM
or I/O error. */
if (e.errNo == EPIPE)
try {
conn->processStderr();
} catch (EndOfFile & e) { }
throw;
}
return readStorePath(conn->from);
}
Path RemoteStore::addTextToStore(const string & name, const string & s,
const PathSet & references, bool repair)
{
if (repair) throw Error("repairing is not supported when building through the Nix daemon");
auto conn(connections->get());
conn->to << wopAddTextToStore << name << s << references;
conn->processStderr();
return readStorePath(conn->from);
}
void RemoteStore::exportPath(const Path & path, bool sign,
Sink & sink)
{
auto conn(connections->get());
conn->to << wopExportPath << path << (sign ? 1 : 0);
conn->processStderr(&sink); /* sink receives the actual data */
readInt(conn->from);
}
Paths RemoteStore::importPaths(bool requireSignature, Source & source,
std::shared_ptr<FSAccessor> accessor)
{
auto conn(connections->get());
conn->to << wopImportPaths;
/* We ignore requireSignature, since the worker forces it to true
anyway. */
conn->processStderr(0, &source);
return readStorePaths<Paths>(conn->from);
}
void RemoteStore::buildPaths(const PathSet & drvPaths, BuildMode buildMode)
{
auto conn(connections->get());
conn->to << wopBuildPaths;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 13) {
conn->to << drvPaths;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 15)
conn->to << buildMode;
else
/* Old daemons did not take a 'buildMode' parameter, so we
need to validate it here on the client side. */
if (buildMode != bmNormal)
throw Error("repairing or checking is not supported when building through the Nix daemon");
} else {
/* For backwards compatibility with old daemons, strip output
identifiers. */
PathSet drvPaths2;
for (auto & i : drvPaths)
drvPaths2.insert(string(i, 0, i.find('!')));
conn->to << drvPaths2;
}
conn->processStderr();
readInt(conn->from);
}
BuildResult RemoteStore::buildDerivation(const Path & drvPath, const BasicDerivation & drv,
BuildMode buildMode)
{
auto conn(connections->get());
conn->to << wopBuildDerivation << drvPath << drv << buildMode;
conn->processStderr();
BuildResult res;
unsigned int status;
conn->from >> status >> res.errorMsg;
res.status = (BuildResult::Status) status;
return res;
}
void RemoteStore::ensurePath(const Path & path)
{
auto conn(connections->get());
conn->to << wopEnsurePath << path;
conn->processStderr();
readInt(conn->from);
}
void RemoteStore::addTempRoot(const Path & path)
{
auto conn(connections->get());
conn->to << wopAddTempRoot << path;
conn->processStderr();
readInt(conn->from);
}
void RemoteStore::addIndirectRoot(const Path & path)
{
auto conn(connections->get());
conn->to << wopAddIndirectRoot << path;
conn->processStderr();
readInt(conn->from);
}
void RemoteStore::syncWithGC()
{
auto conn(connections->get());
conn->to << wopSyncWithGC;
conn->processStderr();
readInt(conn->from);
}
Roots RemoteStore::findRoots()
{
auto conn(connections->get());
conn->to << wopFindRoots;
conn->processStderr();
unsigned int count = readInt(conn->from);
Roots result;
while (count--) {
Path link = readString(conn->from);
Path target = readStorePath(conn->from);
result[link] = target;
}
return result;
}
void RemoteStore::collectGarbage(const GCOptions & options, GCResults & results)
{
auto conn(connections->get());
conn->to << wopCollectGarbage << options.action << options.pathsToDelete << options.ignoreLiveness
<< options.maxFreed << 0;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 5)
/* removed options */
conn->to << 0 << 0;
conn->processStderr();
results.paths = readStrings<PathSet>(conn->from);
results.bytesFreed = readLongLong(conn->from);
readLongLong(conn->from); // obsolete
{
auto state_(Store::state.lock());
state_->pathInfoCache.clear();
stats.pathInfoCacheSize = 0;
}
}
void RemoteStore::optimiseStore()
{
auto conn(connections->get());
conn->to << wopOptimiseStore;
conn->processStderr();
readInt(conn->from);
}
bool RemoteStore::verifyStore(bool checkContents, bool repair)
{
auto conn(connections->get());
conn->to << wopVerifyStore << checkContents << repair;
conn->processStderr();
return readInt(conn->from) != 0;
}
void RemoteStore::addSignatures(const Path & storePath, const StringSet & sigs)
{
auto conn(connections->get());
conn->to << wopAddSignatures << storePath << sigs;
conn->processStderr();
readInt(conn->from);
}
RemoteStore::Connection::~Connection()
{
try {
to.flush();
fd.close();
} catch (...) {
ignoreException();
}
}
void RemoteStore::Connection::processStderr(Sink * sink, Source * source)
{
to.flush();
unsigned int msg;
while ((msg = readInt(from)) == STDERR_NEXT
|| msg == STDERR_READ || msg == STDERR_WRITE) {
if (msg == STDERR_WRITE) {
string s = readString(from);
if (!sink) throw Error("no sink");
(*sink)((const unsigned char *) s.data(), s.size());
}
else if (msg == STDERR_READ) {
if (!source) throw Error("no source");
size_t len = readInt(from);
unsigned char * buf = new unsigned char[len];
AutoDeleteArray<unsigned char> d(buf);
writeString(buf, source->read(buf, len), to);
to.flush();
}
else {
string s = readString(from);
writeToStderr(s);
}
}
if (msg == STDERR_ERROR) {
string error = readString(from);
unsigned int status = GET_PROTOCOL_MINOR(daemonVersion) >= 8 ? readInt(from) : 1;
throw Error(format("%1%") % error, status);
}
else if (msg != STDERR_LAST)
throw Error("protocol error processing standard error");
}
}