This commit is contained in:
Eelco Dolstra 2016-11-09 18:57:22 +01:00
commit 4b8f1b0ec0
15 changed files with 465 additions and 91 deletions

View file

@ -6,9 +6,9 @@
#include "globals.hh"
#include "nar-info.hh"
#include "sync.hh"
#include "worker-protocol.hh"
#include "nar-accessor.hh"
#include "remote-fs-accessor.hh"
#include "nar-info-disk-cache.hh"
#include "nar-accessor.hh"
#include "json.hh"
#include <chrono>
@ -379,8 +379,7 @@ Path BinaryCacheStore::addTextToStore(const string & name, const string & s,
ref<FSAccessor> BinaryCacheStore::getFSAccessor()
{
return make_ref<BinaryCacheStoreAccessor>(ref<BinaryCacheStore>(
std::dynamic_pointer_cast<BinaryCacheStore>(shared_from_this())));
return make_ref<RemoteFSAccessor>(ref<Store>(shared_from_this()));
}
}

View file

@ -37,7 +37,8 @@ namespace nix {
LocalStore::LocalStore(const Params & params)
: LocalFSStore(params)
: Store(params)
, LocalFSStore(params)
, realStoreDir(get(params, "real", rootDir != "" ? rootDir + "/nix/store" : storeDir))
, dbDir(stateDir + "/db")
, linksDir(realStoreDir + "/.links")

View file

@ -0,0 +1,57 @@
#include "remote-fs-accessor.hh"
#include "nar-accessor.hh"
namespace nix {
RemoteFSAccessor::RemoteFSAccessor(ref<Store> store)
: store(store)
{
}
std::pair<ref<FSAccessor>, Path> RemoteFSAccessor::fetch(const Path & path_)
{
auto path = canonPath(path_);
auto storePath = store->toStorePath(path);
std::string restPath = std::string(path, storePath.size());
if (!store->isValidPath(storePath))
throw InvalidPath(format("path %1% is not a valid store path") % storePath);
auto i = nars.find(storePath);
if (i != nars.end()) return {i->second, restPath};
StringSink sink;
store->narFromPath(storePath, sink);
auto accessor = makeNarAccessor(sink.s);
nars.emplace(storePath, accessor);
return {accessor, restPath};
}
FSAccessor::Stat RemoteFSAccessor::stat(const Path & path)
{
auto res = fetch(path);
return res.first->stat(res.second);
}
StringSet RemoteFSAccessor::readDirectory(const Path & path)
{
auto res = fetch(path);
return res.first->readDirectory(res.second);
}
std::string RemoteFSAccessor::readFile(const Path & path)
{
auto res = fetch(path);
return res.first->readFile(res.second);
}
std::string RemoteFSAccessor::readLink(const Path & path)
{
auto res = fetch(path);
return res.first->readLink(res.second);
}
}

View file

@ -0,0 +1,29 @@
#pragma once
#include "fs-accessor.hh"
#include "ref.hh"
#include "store-api.hh"
namespace nix {
class RemoteFSAccessor : public FSAccessor
{
ref<Store> store;
std::map<Path, ref<FSAccessor>> nars;
std::pair<ref<FSAccessor>, Path> fetch(const Path & path_);
public:
RemoteFSAccessor(ref<Store> store);
Stat stat(const Path & path) override;
StringSet readDirectory(const Path & path) override;
std::string readFile(const Path & path) override;
std::string readLink(const Path & path) override;
};
}

View file

@ -38,9 +38,9 @@ template<class T> T readStorePaths(Store & store, Source & from)
template PathSet readStorePaths(Store & store, Source & from);
/* TODO: Separate these store impls into different files, give them better names */
RemoteStore::RemoteStore(const Params & params, size_t maxConnections)
: LocalFSStore(params)
: Store(params)
, connections(make_ref<Pool<Connection>>(
maxConnections,
[this]() { return openConnection(); },
@ -50,13 +50,21 @@ RemoteStore::RemoteStore(const Params & params, size_t maxConnections)
}
std::string RemoteStore::getUri()
UDSRemoteStore::UDSRemoteStore(const Params & params, size_t maxConnections)
: Store(params)
, LocalFSStore(params)
, RemoteStore(params, maxConnections)
{
}
std::string UDSRemoteStore::getUri()
{
return "daemon";
}
ref<RemoteStore::Connection> RemoteStore::openConnection()
ref<RemoteStore::Connection> UDSRemoteStore::openConnection()
{
auto conn = make_ref<Connection>();
@ -84,46 +92,52 @@ ref<RemoteStore::Connection> RemoteStore::openConnection()
conn->from.fd = conn->fd.get();
conn->to.fd = conn->fd.get();
initConnection(*conn);
return conn;
}
void RemoteStore::initConnection(Connection & conn)
{
/* Send the magic greeting, check for the reply. */
try {
conn->to << WORKER_MAGIC_1;
conn->to.flush();
unsigned int magic = readInt(conn->from);
conn.to << WORKER_MAGIC_1;
conn.to.flush();
unsigned int magic = readInt(conn.from);
if (magic != WORKER_MAGIC_2) throw Error("protocol mismatch");
conn->daemonVersion = readInt(conn->from);
if (GET_PROTOCOL_MAJOR(conn->daemonVersion) != GET_PROTOCOL_MAJOR(PROTOCOL_VERSION))
conn.daemonVersion = readInt(conn.from);
if (GET_PROTOCOL_MAJOR(conn.daemonVersion) != GET_PROTOCOL_MAJOR(PROTOCOL_VERSION))
throw Error("Nix daemon protocol version not supported");
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 10)
if (GET_PROTOCOL_MINOR(conn.daemonVersion) < 10)
throw Error("the Nix daemon version is too old");
conn->to << PROTOCOL_VERSION;
conn.to << PROTOCOL_VERSION;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 14) {
if (GET_PROTOCOL_MINOR(conn.daemonVersion) >= 14) {
int cpu = settings.lockCPU ? lockToCurrentCPU() : -1;
if (cpu != -1)
conn->to << 1 << cpu;
conn.to << 1 << cpu;
else
conn->to << 0;
conn.to << 0;
}
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 11)
conn->to << false;
if (GET_PROTOCOL_MINOR(conn.daemonVersion) >= 11)
conn.to << false;
conn->processStderr();
conn.processStderr();
}
catch (Error & e) {
throw Error(format("cannot start daemon worker: %1%") % e.msg());
}
setOptions(conn);
return conn;
}
void RemoteStore::setOptions(ref<Connection> conn)
void RemoteStore::setOptions(Connection & conn)
{
conn->to << wopSetOptions
conn.to << wopSetOptions
<< settings.keepFailed
<< settings.keepGoing
<< settings.tryFallback
@ -137,16 +151,16 @@ void RemoteStore::setOptions(ref<Connection> conn)
<< settings.buildCores
<< settings.useSubstitutes;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 12) {
if (GET_PROTOCOL_MINOR(conn.daemonVersion) >= 12) {
Settings::SettingsMap overrides = settings.getOverrides();
if (overrides["ssh-auth-sock"] == "")
overrides["ssh-auth-sock"] = getEnv("SSH_AUTH_SOCK");
conn->to << overrides.size();
conn.to << overrides.size();
for (auto & i : overrides)
conn->to << i.first << i.second;
conn.to << i.first << i.second;
}
conn->processStderr();
conn.processStderr();
}
@ -336,6 +350,8 @@ void RemoteStore::addToStore(const ValidPathInfo & info, const ref<std::string>
bool repair, bool dontCheckSigs, std::shared_ptr<FSAccessor> accessor)
{
auto conn(connections->get());
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 18) {
conn->to << wopImportPaths;
StringSink sink;
@ -357,6 +373,16 @@ void RemoteStore::addToStore(const ValidPathInfo & info, const ref<std::string>
auto importedPaths = readStorePaths<PathSet>(*this, conn->from);
assert(importedPaths.size() <= 1);
}
else {
conn->to << wopAddToStoreNar
<< info.path << info.deriver << printHash(info.narHash)
<< info.references << info.registrationTime << info.narSize
<< info.ultimate << info.sigs << *nar << repair << dontCheckSigs;
// FIXME: don't send nar as a string
conn->processStderr();
}
}
@ -553,7 +579,6 @@ RemoteStore::Connection::~Connection()
{
try {
to.flush();
fd = -1;
} catch (...) {
ignoreException();
}

View file

@ -18,7 +18,7 @@ template<typename T> class Pool;
/* FIXME: RemoteStore is a misnomer - should be something like
DaemonStore. */
class RemoteStore : public LocalFSStore
class RemoteStore : public virtual Store
{
public:
@ -26,8 +26,6 @@ public:
/* Implementations of abstract store API methods. */
std::string getUri() override;
bool isValidPathUncached(const Path & path) override;
PathSet queryValidPaths(const PathSet & paths) override;
@ -87,25 +85,46 @@ public:
void addSignatures(const Path & storePath, const StringSet & sigs) override;
private:
protected:
struct Connection
{
AutoCloseFD fd;
FdSink to;
FdSource from;
unsigned int daemonVersion;
~Connection();
virtual ~Connection();
void processStderr(Sink * sink = 0, Source * source = 0);
};
virtual ref<Connection> openConnection() = 0;
void initConnection(Connection & conn);
ref<Pool<Connection>> connections;
ref<Connection> openConnection();
private:
void setOptions(ref<Connection> conn);
void setOptions(Connection & conn);
};
class UDSRemoteStore : public LocalFSStore, public RemoteStore
{
public:
UDSRemoteStore(const Params & params, size_t maxConnections = std::numeric_limits<size_t>::max());
std::string getUri() override;
private:
struct Connection : RemoteStore::Connection
{
AutoCloseFD fd;
};
ref<RemoteStore::Connection> openConnection() override;
};

130
src/libstore/ssh-store.cc Normal file
View file

@ -0,0 +1,130 @@
#include "store-api.hh"
#include "remote-store.hh"
#include "remote-fs-accessor.hh"
#include "archive.hh"
#include "worker-protocol.hh"
#include "pool.hh"
namespace nix {
class SSHStore : public RemoteStore
{
public:
SSHStore(string uri, const Params & params, size_t maxConnections = std::numeric_limits<size_t>::max());
std::string getUri() override;
void narFromPath(const Path & path, Sink & sink) override;
ref<FSAccessor> getFSAccessor() override;
private:
struct Connection : RemoteStore::Connection
{
Pid sshPid;
AutoCloseFD out;
AutoCloseFD in;
};
ref<RemoteStore::Connection> openConnection() override;
AutoDelete tmpDir;
Path socketPath;
Pid sshMaster;
string uri;
Path key;
};
SSHStore::SSHStore(string uri, const Params & params, size_t maxConnections)
: Store(params)
, RemoteStore(params, maxConnections)
, tmpDir(createTempDir("", "nix", true, true, 0700))
, socketPath((Path) tmpDir + "/ssh.sock")
, uri(std::move(uri))
, key(get(params, "ssh-key", ""))
{
}
string SSHStore::getUri()
{
return "ssh://" + uri;
}
class ForwardSource : public Source
{
Source & readSource;
Sink & writeSink;
public:
ForwardSource(Source & readSource, Sink & writeSink) : readSource(readSource), writeSink(writeSink) {}
size_t read(unsigned char * data, size_t len) override
{
auto res = readSource.read(data, len);
writeSink(data, len);
return res;
}
};
void SSHStore::narFromPath(const Path & path, Sink & sink)
{
auto conn(connections->get());
conn->to << wopNarFromPath << path;
conn->processStderr();
ParseSink ps;
auto fwd = ForwardSource(conn->from, sink);
parseDump(ps, fwd);
}
ref<FSAccessor> SSHStore::getFSAccessor()
{
return make_ref<RemoteFSAccessor>(ref<Store>(shared_from_this()));
}
ref<RemoteStore::Connection> SSHStore::openConnection()
{
if ((pid_t) sshMaster == -1) {
sshMaster = startProcess([&]() {
if (key.empty())
execlp("ssh", "ssh", "-N", "-M", "-S", socketPath.c_str(), uri.c_str(), NULL);
else
execlp("ssh", "ssh", "-N", "-M", "-S", socketPath.c_str(), "-i", key.c_str(), uri.c_str(), NULL);
throw SysError("starting ssh master");
});
}
auto conn = make_ref<Connection>();
Pipe in, out;
in.create();
out.create();
conn->sshPid = startProcess([&]() {
if (dup2(in.readSide.get(), STDIN_FILENO) == -1)
throw SysError("duping over STDIN");
if (dup2(out.writeSide.get(), STDOUT_FILENO) == -1)
throw SysError("duping over STDOUT");
execlp("ssh", "ssh", "-S", socketPath.c_str(), uri.c_str(), "nix-daemon", "--stdio", NULL);
throw SysError("executing nix-daemon --stdio over ssh");
});
in.readSide = -1;
out.writeSide = -1;
conn->out = std::move(out.readSide);
conn->in = std::move(in.writeSide);
conn->to = FdSink(conn->in.get());
conn->from = FdSource(conn->out.get());
initConnection(*conn);
return conn;
}
static RegisterStoreImplementation regStore([](
const std::string & uri, const Store::Params & params)
-> std::shared_ptr<Store>
{
if (std::string(uri, 0, 6) != "ssh://") return 0;
return std::make_shared<SSHStore>(uri.substr(6), params);
});
}

View file

@ -606,7 +606,7 @@ namespace nix {
RegisterStoreImplementation::Implementations * RegisterStoreImplementation::implementations = 0;
ref<Store> openStoreAt(const std::string & uri_)
ref<Store> openStore(const std::string & uri_)
{
auto uri(uri_);
Store::Params params;
@ -629,9 +629,22 @@ ref<Store> openStoreAt(const std::string & uri_)
}
ref<Store> openStore()
StoreType getStoreType(const std::string & uri, const std::string & stateDir)
{
return openStoreAt(getEnv("NIX_REMOTE"));
if (uri == "daemon") {
return tDaemon;
} else if (uri == "local") {
return tLocal;
} else if (uri == "") {
if (access(stateDir.c_str(), R_OK | W_OK) == 0)
return tLocal;
else if (pathExists(settings.nixDaemonSocketFile))
return tDaemon;
else
return tLocal;
} else {
return tOther;
}
}
@ -639,26 +652,14 @@ static RegisterStoreImplementation regStore([](
const std::string & uri, const Store::Params & params)
-> std::shared_ptr<Store>
{
enum { mDaemon, mLocal, mAuto } mode;
if (uri == "daemon") mode = mDaemon;
else if (uri == "local") mode = mLocal;
else if (uri == "") mode = mAuto;
else return 0;
if (mode == mAuto) {
auto stateDir = get(params, "state", settings.nixStateDir);
if (access(stateDir.c_str(), R_OK | W_OK) == 0)
mode = mLocal;
else if (pathExists(settings.nixDaemonSocketFile))
mode = mDaemon;
else
mode = mLocal;
switch (getStoreType(uri, get(params, "state", settings.nixStateDir))) {
case tDaemon:
return std::shared_ptr<Store>(std::make_shared<UDSRemoteStore>(params));
case tLocal:
return std::shared_ptr<Store>(std::make_shared<LocalStore>(params));
default:
return nullptr;
}
return mode == mDaemon
? std::shared_ptr<Store>(std::make_shared<RemoteStore>(params))
: std::shared_ptr<Store>(std::make_shared<LocalStore>(params));
});
@ -679,7 +680,7 @@ std::list<ref<Store>> getDefaultSubstituters()
auto addStore = [&](const std::string & uri) {
if (done.count(uri)) return;
done.insert(uri);
state->stores.push_back(openStoreAt(uri));
state->stores.push_back(openStore(uri));
};
for (auto uri : settings.get("substituters", Strings()))

View file

@ -5,6 +5,7 @@
#include "crypto.hh"
#include "lru-cache.hh"
#include "sync.hh"
#include "globals.hh"
#include <atomic>
#include <limits>
@ -537,7 +538,7 @@ protected:
};
class LocalFSStore : public Store
class LocalFSStore : public virtual Store
{
public:
const Path rootDir;
@ -604,13 +605,18 @@ void removeTempRoots();
If uri is empty, it defaults to direct or daemon depending on
whether the user has write access to the local Nix store/database.
set to true *unless* you're going to collect garbage. */
ref<Store> openStoreAt(const std::string & uri);
ref<Store> openStore(const std::string & uri = getEnv("NIX_REMOTE"));
/* Open the store indicated by the NIX_REMOTE environment variable. */
ref<Store> openStore();
enum StoreType {
tDaemon,
tLocal,
tOther
};
StoreType getStoreType(const std::string & uri = getEnv("NIX_REMOTE"), const std::string & stateDir = settings.nixStateDir);
/* Return the default substituter stores, defined by the
substituters option and various legacy options like
binary-caches. */

View file

@ -6,7 +6,7 @@ namespace nix {
#define WORKER_MAGIC_1 0x6e697863
#define WORKER_MAGIC_2 0x6478696f
#define PROTOCOL_VERSION 0x111
#define PROTOCOL_VERSION 0x112
#define GET_PROTOCOL_MAJOR(x) ((x) & 0xff00)
#define GET_PROTOCOL_MINOR(x) ((x) & 0x00ff)
@ -46,6 +46,8 @@ typedef enum {
wopVerifyStore = 35,
wopBuildDerivation = 36,
wopAddSignatures = 37,
wopNarFromPath = 38,
wopAddToStoreNar = 39
} WorkerOp;

View file

@ -22,6 +22,7 @@
#include <errno.h>
#include <pwd.h>
#include <grp.h>
#include <fcntl.h>
#if __APPLE__ || __FreeBSD__
#include <sys/ucred.h>
@ -29,6 +30,25 @@
using namespace nix;
#ifndef __linux__
#define SPLICE_F_MOVE 0
static ssize_t splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags)
{
/* We ignore most parameters, we just have them for conformance with the linux syscall */
char buf[8192];
auto read_count = read(fd_in, buf, sizeof(buf));
if (read_count == -1)
return read_count;
auto write_count = decltype<read_count>(0);
while (write_count < read_count) {
auto res = write(fd_out, buf + write_count, read_count - write_count);
if (res == -1)
return res;
write_count += res;
}
return read_count;
}
#endif
static FdSource from(STDIN_FILENO);
static FdSink to(STDOUT_FILENO);
@ -556,6 +576,37 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
break;
}
case wopNarFromPath: {
auto path = readStorePath(*store, from);
startWork();
stopWork();
dumpPath(path, to);
break;
}
case wopAddToStoreNar: {
ValidPathInfo info;
info.path = readStorePath(*store, from);
info.deriver = readString(from);
if (!info.deriver.empty())
store->assertStorePath(info.deriver);
info.narHash = parseHash(htSHA256, readString(from));
info.references = readStorePaths<PathSet>(*store, from);
info.registrationTime = readInt(from);
info.narSize = readLongLong(from);
info.ultimate = readLongLong(from);
info.sigs = readStrings<StringSet>(from);
auto nar = make_ref<std::string>(readString(from));
auto repair = readInt(from) ? true : false;
auto dontCheckSigs = readInt(from) ? true : false;
if (!trusted && dontCheckSigs)
dontCheckSigs = false;
startWork();
store->addToStore(info, nar, repair, dontCheckSigs, nullptr);
stopWork();
break;
}
default:
throw Error(format("invalid operation %1%") % op);
}
@ -885,6 +936,8 @@ int main(int argc, char * * argv)
return handleExceptions(argv[0], [&]() {
initNix();
auto stdio = false;
parseCmdLine(argc, argv, [&](Strings::iterator & arg, const Strings::iterator & end) {
if (*arg == "--daemon")
; /* ignored for backwards compatibility */
@ -892,10 +945,62 @@ int main(int argc, char * * argv)
showManPage("nix-daemon");
else if (*arg == "--version")
printVersion("nix-daemon");
else if (*arg == "--stdio")
stdio = true;
else return false;
return true;
});
if (stdio) {
if (getStoreType() == tDaemon) {
/* Forward on this connection to the real daemon */
auto socketPath = settings.nixDaemonSocketFile;
auto s = socket(PF_UNIX, SOCK_STREAM, 0);
if (s == -1)
throw SysError("creating Unix domain socket");
auto socketDir = dirOf(socketPath);
if (chdir(socketDir.c_str()) == -1)
throw SysError(format("changing to socket directory %1%") % socketDir);
auto socketName = baseNameOf(socketPath);
auto addr = sockaddr_un{};
addr.sun_family = AF_UNIX;
if (socketName.size() + 1 >= sizeof(addr.sun_path))
throw Error(format("socket name %1% is too long") % socketName);
strcpy(addr.sun_path, socketName.c_str());
if (connect(s, (struct sockaddr *) &addr, sizeof(addr)) == -1)
throw SysError(format("cannot connect to daemon at %1%") % socketPath);
auto nfds = (s > STDIN_FILENO ? s : STDIN_FILENO) + 1;
while (true) {
fd_set fds;
FD_ZERO(&fds);
FD_SET(s, &fds);
FD_SET(STDIN_FILENO, &fds);
if (select(nfds, &fds, nullptr, nullptr, nullptr) == -1)
throw SysError("waiting for data from client or server");
if (FD_ISSET(s, &fds)) {
auto res = splice(s, nullptr, STDOUT_FILENO, nullptr, SIZE_MAX, SPLICE_F_MOVE);
if (res == -1)
throw SysError("splicing data from daemon socket to stdout");
else if (res == 0)
throw EndOfFile("unexpected EOF from daemon socket");
}
if (FD_ISSET(STDIN_FILENO, &fds)) {
auto res = splice(STDIN_FILENO, nullptr, s, nullptr, SIZE_MAX, SPLICE_F_MOVE);
if (res == -1)
throw SysError("splicing data from stdin to daemon socket");
else if (res == 0)
return;
}
}
} else {
processConnection(true);
}
} else {
daemonLoop(argv);
}
});
}

View file

@ -81,7 +81,7 @@ StoreCommand::StoreCommand()
void StoreCommand::run()
{
run(openStoreAt(storeUri));
run(openStore(storeUri));
}
StorePathsCommand::StorePathsCommand()

View file

@ -43,8 +43,8 @@ struct CmdCopy : StorePathsCommand
if (srcUri.empty() && dstUri.empty())
throw UsageError("you must pass --from and/or --to");
ref<Store> srcStore = srcUri.empty() ? store : openStoreAt(srcUri);
ref<Store> dstStore = dstUri.empty() ? store : openStoreAt(dstUri);
ref<Store> srcStore = srcUri.empty() ? store : openStore(srcUri);
ref<Store> dstStore = dstUri.empty() ? store : openStore(dstUri);
std::string copiedLabel = "copied";

View file

@ -35,7 +35,7 @@ struct CmdCopySigs : StorePathsCommand
// FIXME: factor out commonality with MixVerify.
std::vector<ref<Store>> substituters;
for (auto & s : substituterUris)
substituters.push_back(openStoreAt(s));
substituters.push_back(openStore(s));
ThreadPool pool;

View file

@ -52,7 +52,7 @@ struct CmdVerify : StorePathsCommand
{
std::vector<ref<Store>> substituters;
for (auto & s : substituterUris)
substituters.push_back(openStoreAt(s));
substituters.push_back(openStore(s));
auto publicKeys = getDefaultPublicKeys();