forked from lix-project/lix
Improve SSH handling
* Unify SSH code in SSHStore and LegacySSHStore. * Fix a race starting the SSH master. We now wait synchronously for the SSH master to finish starting. This prevents the SSH clients from starting their own connections. * Don't use a master if max-connections == 1. * Add a "max-connections" store parameter. * Add a "compress" store parameter.
This commit is contained in:
parent
7f62be1bcd
commit
577ebeaefb
|
@ -4,6 +4,7 @@
|
||||||
#include "serve-protocol.hh"
|
#include "serve-protocol.hh"
|
||||||
#include "store-api.hh"
|
#include "store-api.hh"
|
||||||
#include "worker-protocol.hh"
|
#include "worker-protocol.hh"
|
||||||
|
#include "ssh.hh"
|
||||||
|
|
||||||
namespace nix {
|
namespace nix {
|
||||||
|
|
||||||
|
@ -11,73 +12,42 @@ static std::string uriScheme = "legacy-ssh://";
|
||||||
|
|
||||||
struct LegacySSHStore : public Store
|
struct LegacySSHStore : public Store
|
||||||
{
|
{
|
||||||
string host;
|
|
||||||
|
|
||||||
struct Connection
|
struct Connection
|
||||||
{
|
{
|
||||||
Pid sshPid;
|
std::unique_ptr<SSHMaster::Connection> sshConn;
|
||||||
AutoCloseFD out;
|
|
||||||
AutoCloseFD in;
|
|
||||||
FdSink to;
|
FdSink to;
|
||||||
FdSource from;
|
FdSource from;
|
||||||
};
|
};
|
||||||
|
|
||||||
AutoDelete tmpDir;
|
std::string host;
|
||||||
|
|
||||||
Path socketPath;
|
|
||||||
|
|
||||||
Pid sshMaster;
|
|
||||||
|
|
||||||
ref<Pool<Connection>> connections;
|
ref<Pool<Connection>> connections;
|
||||||
|
|
||||||
Path key;
|
SSHMaster master;
|
||||||
|
|
||||||
LegacySSHStore(const string & host, const Params & params,
|
LegacySSHStore(const string & host, const Params & params)
|
||||||
size_t maxConnections = std::numeric_limits<size_t>::max())
|
|
||||||
: Store(params)
|
: Store(params)
|
||||||
, host(host)
|
, host(host)
|
||||||
, tmpDir(createTempDir("", "nix", true, true, 0700))
|
|
||||||
, socketPath((Path) tmpDir + "/ssh.sock")
|
|
||||||
, connections(make_ref<Pool<Connection>>(
|
, connections(make_ref<Pool<Connection>>(
|
||||||
maxConnections,
|
std::max(1, std::stoi(get(params, "max-connections", "1"))),
|
||||||
[this]() { return openConnection(); },
|
[this]() { return openConnection(); },
|
||||||
[](const ref<Connection> & r) { return true; }
|
[](const ref<Connection> & r) { return true; }
|
||||||
))
|
))
|
||||||
, key(get(params, "ssh-key", ""))
|
, master(
|
||||||
|
host,
|
||||||
|
get(params, "ssh-key", ""),
|
||||||
|
// Use SSH master only if using more than 1 connection.
|
||||||
|
connections->capacity() > 1,
|
||||||
|
get(params, "compress", "") == "true")
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
ref<Connection> openConnection()
|
ref<Connection> openConnection()
|
||||||
{
|
{
|
||||||
if ((pid_t) sshMaster == -1) {
|
|
||||||
sshMaster = startProcess([&]() {
|
|
||||||
restoreSignals();
|
|
||||||
Strings args{ "ssh", "-M", "-S", socketPath, "-N", "-x", "-a", host };
|
|
||||||
if (!key.empty())
|
|
||||||
args.insert(args.end(), {"-i", key});
|
|
||||||
execvp("ssh", stringsToCharPtrs(args).data());
|
|
||||||
throw SysError("starting SSH master connection to host ‘%s’", host);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
auto conn = make_ref<Connection>();
|
auto conn = make_ref<Connection>();
|
||||||
Pipe in, out;
|
conn->sshConn = master.startCommand("nix-store --serve");
|
||||||
in.create();
|
conn->to = FdSink(conn->sshConn->in.get());
|
||||||
out.create();
|
conn->from = FdSource(conn->sshConn->out.get());
|
||||||
conn->sshPid = startProcess([&]() {
|
|
||||||
if (dup2(in.readSide.get(), STDIN_FILENO) == -1)
|
|
||||||
throw SysError("duping over STDIN");
|
|
||||||
if (dup2(out.writeSide.get(), STDOUT_FILENO) == -1)
|
|
||||||
throw SysError("duping over STDOUT");
|
|
||||||
execlp("ssh", "ssh", "-S", socketPath.c_str(), host.c_str(), "nix-store", "--serve", "--write", nullptr);
|
|
||||||
throw SysError("executing ‘nix-store --serve’ on remote host ‘%s’", host);
|
|
||||||
});
|
|
||||||
in.readSide = -1;
|
|
||||||
out.writeSide = -1;
|
|
||||||
conn->out = std::move(out.readSide);
|
|
||||||
conn->in = std::move(in.writeSide);
|
|
||||||
conn->to = FdSink(conn->in.get());
|
|
||||||
conn->from = FdSource(conn->out.get());
|
|
||||||
|
|
||||||
int remoteVersion;
|
int remoteVersion;
|
||||||
|
|
||||||
|
|
|
@ -40,10 +40,10 @@ template PathSet readStorePaths(Store & store, Source & from);
|
||||||
template Paths readStorePaths(Store & store, Source & from);
|
template Paths readStorePaths(Store & store, Source & from);
|
||||||
|
|
||||||
/* TODO: Separate these store impls into different files, give them better names */
|
/* TODO: Separate these store impls into different files, give them better names */
|
||||||
RemoteStore::RemoteStore(const Params & params, size_t maxConnections)
|
RemoteStore::RemoteStore(const Params & params)
|
||||||
: Store(params)
|
: Store(params)
|
||||||
, connections(make_ref<Pool<Connection>>(
|
, connections(make_ref<Pool<Connection>>(
|
||||||
maxConnections,
|
std::max(1, std::stoi(get(params, "max-connections", "1"))),
|
||||||
[this]() { return openConnection(); },
|
[this]() { return openConnection(); },
|
||||||
[](const ref<Connection> & r) { return r->to.good() && r->from.good(); }
|
[](const ref<Connection> & r) { return r->to.good() && r->from.good(); }
|
||||||
))
|
))
|
||||||
|
@ -51,10 +51,10 @@ RemoteStore::RemoteStore(const Params & params, size_t maxConnections)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
UDSRemoteStore::UDSRemoteStore(const Params & params, size_t maxConnections)
|
UDSRemoteStore::UDSRemoteStore(const Params & params)
|
||||||
: Store(params)
|
: Store(params)
|
||||||
, LocalFSStore(params)
|
, LocalFSStore(params)
|
||||||
, RemoteStore(params, maxConnections)
|
, RemoteStore(params)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -129,7 +129,7 @@ void RemoteStore::initConnection(Connection & conn)
|
||||||
conn.processStderr();
|
conn.processStderr();
|
||||||
}
|
}
|
||||||
catch (Error & e) {
|
catch (Error & e) {
|
||||||
throw Error(format("cannot start daemon worker: %1%") % e.msg());
|
throw Error("cannot open connection to remote store ‘%s’: %s", getUri(), e.what());
|
||||||
}
|
}
|
||||||
|
|
||||||
setOptions(conn);
|
setOptions(conn);
|
||||||
|
|
|
@ -22,7 +22,7 @@ class RemoteStore : public virtual Store
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
|
||||||
RemoteStore(const Params & params, size_t maxConnections = std::numeric_limits<size_t>::max());
|
RemoteStore(const Params & params);
|
||||||
|
|
||||||
/* Implementations of abstract store API methods. */
|
/* Implementations of abstract store API methods. */
|
||||||
|
|
||||||
|
@ -113,7 +113,7 @@ class UDSRemoteStore : public LocalFSStore, public RemoteStore
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
|
||||||
UDSRemoteStore(const Params & params, size_t maxConnections = std::numeric_limits<size_t>::max());
|
UDSRemoteStore(const Params & params);
|
||||||
|
|
||||||
std::string getUri() override;
|
std::string getUri() override;
|
||||||
|
|
||||||
|
|
|
@ -4,6 +4,7 @@
|
||||||
#include "archive.hh"
|
#include "archive.hh"
|
||||||
#include "worker-protocol.hh"
|
#include "worker-protocol.hh"
|
||||||
#include "pool.hh"
|
#include "pool.hh"
|
||||||
|
#include "ssh.hh"
|
||||||
|
|
||||||
namespace nix {
|
namespace nix {
|
||||||
|
|
||||||
|
@ -13,9 +14,23 @@ class SSHStore : public RemoteStore
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
|
||||||
SSHStore(string host, const Params & params, size_t maxConnections = std::numeric_limits<size_t>::max());
|
SSHStore(const std::string & host, const Params & params)
|
||||||
|
: Store(params)
|
||||||
|
, RemoteStore(params)
|
||||||
|
, host(host)
|
||||||
|
, master(
|
||||||
|
host,
|
||||||
|
get(params, "ssh-key", ""),
|
||||||
|
// Use SSH master only if using more than 1 connection.
|
||||||
|
connections->capacity() > 1,
|
||||||
|
get(params, "compress", "") == "true")
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
std::string getUri() override;
|
std::string getUri() override
|
||||||
|
{
|
||||||
|
return uriScheme + host;
|
||||||
|
}
|
||||||
|
|
||||||
void narFromPath(const Path & path, Sink & sink) override;
|
void narFromPath(const Path & path, Sink & sink) override;
|
||||||
|
|
||||||
|
@ -25,43 +40,16 @@ private:
|
||||||
|
|
||||||
struct Connection : RemoteStore::Connection
|
struct Connection : RemoteStore::Connection
|
||||||
{
|
{
|
||||||
Pid sshPid;
|
std::unique_ptr<SSHMaster::Connection> sshConn;
|
||||||
AutoCloseFD out;
|
|
||||||
AutoCloseFD in;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
ref<RemoteStore::Connection> openConnection() override;
|
ref<RemoteStore::Connection> openConnection() override;
|
||||||
|
|
||||||
AutoDelete tmpDir;
|
std::string host;
|
||||||
|
|
||||||
Path socketPath;
|
SSHMaster master;
|
||||||
|
|
||||||
Pid sshMaster;
|
|
||||||
|
|
||||||
string host;
|
|
||||||
|
|
||||||
Path key;
|
|
||||||
|
|
||||||
bool compress;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
SSHStore::SSHStore(string host, const Params & params, size_t maxConnections)
|
|
||||||
: Store(params)
|
|
||||||
, RemoteStore(params, maxConnections)
|
|
||||||
, tmpDir(createTempDir("", "nix", true, true, 0700))
|
|
||||||
, socketPath((Path) tmpDir + "/ssh.sock")
|
|
||||||
, host(std::move(host))
|
|
||||||
, key(get(params, "ssh-key", ""))
|
|
||||||
, compress(get(params, "compress", "") == "true")
|
|
||||||
{
|
|
||||||
/* open a connection and perform the handshake to verify all is well */
|
|
||||||
connections->get();
|
|
||||||
}
|
|
||||||
|
|
||||||
string SSHStore::getUri()
|
|
||||||
{
|
|
||||||
return uriScheme + host;
|
|
||||||
}
|
|
||||||
|
|
||||||
class ForwardSource : public Source
|
class ForwardSource : public Source
|
||||||
{
|
{
|
||||||
|
@ -94,35 +82,10 @@ ref<FSAccessor> SSHStore::getFSAccessor()
|
||||||
|
|
||||||
ref<RemoteStore::Connection> SSHStore::openConnection()
|
ref<RemoteStore::Connection> SSHStore::openConnection()
|
||||||
{
|
{
|
||||||
if ((pid_t) sshMaster == -1) {
|
|
||||||
sshMaster = startProcess([&]() {
|
|
||||||
restoreSignals();
|
|
||||||
if (key.empty())
|
|
||||||
execlp("ssh", "ssh", "-N", "-M", "-S", socketPath.c_str(), host.c_str(), NULL);
|
|
||||||
else
|
|
||||||
execlp("ssh", "ssh", "-N", "-M", "-S", socketPath.c_str(), "-i", key.c_str(), host.c_str(), NULL);
|
|
||||||
throw SysError("starting ssh master");
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
auto conn = make_ref<Connection>();
|
auto conn = make_ref<Connection>();
|
||||||
Pipe in, out;
|
conn->sshConn = master.startCommand("nix-daemon --stdio");
|
||||||
in.create();
|
conn->to = FdSink(conn->sshConn->in.get());
|
||||||
out.create();
|
conn->from = FdSource(conn->sshConn->out.get());
|
||||||
conn->sshPid = startProcess([&]() {
|
|
||||||
if (dup2(in.readSide.get(), STDIN_FILENO) == -1)
|
|
||||||
throw SysError("duping over STDIN");
|
|
||||||
if (dup2(out.writeSide.get(), STDOUT_FILENO) == -1)
|
|
||||||
throw SysError("duping over STDOUT");
|
|
||||||
execlp("ssh", "ssh", "-S", socketPath.c_str(), host.c_str(), "nix-daemon", "--stdio", NULL);
|
|
||||||
throw SysError("executing nix-daemon --stdio over ssh");
|
|
||||||
});
|
|
||||||
in.readSide = -1;
|
|
||||||
out.writeSide = -1;
|
|
||||||
conn->out = std::move(out.readSide);
|
|
||||||
conn->in = std::move(in.writeSide);
|
|
||||||
conn->to = FdSink(conn->in.get());
|
|
||||||
conn->from = FdSource(conn->out.get());
|
|
||||||
initConnection(*conn);
|
initConnection(*conn);
|
||||||
return conn;
|
return conn;
|
||||||
}
|
}
|
||||||
|
|
93
src/libstore/ssh.cc
Normal file
93
src/libstore/ssh.cc
Normal file
|
@ -0,0 +1,93 @@
|
||||||
|
#include "ssh.hh"
|
||||||
|
|
||||||
|
namespace nix {
|
||||||
|
|
||||||
|
std::unique_ptr<SSHMaster::Connection> SSHMaster::startCommand(const std::string & command)
|
||||||
|
{
|
||||||
|
startMaster();
|
||||||
|
|
||||||
|
Pipe in, out;
|
||||||
|
in.create();
|
||||||
|
out.create();
|
||||||
|
|
||||||
|
auto conn = std::make_unique<Connection>();
|
||||||
|
conn->sshPid = startProcess([&]() {
|
||||||
|
restoreSignals();
|
||||||
|
|
||||||
|
close(in.writeSide.get());
|
||||||
|
close(out.readSide.get());
|
||||||
|
|
||||||
|
if (dup2(in.readSide.get(), STDIN_FILENO) == -1)
|
||||||
|
throw SysError("duping over stdin");
|
||||||
|
if (dup2(out.writeSide.get(), STDOUT_FILENO) == -1)
|
||||||
|
throw SysError("duping over stdout");
|
||||||
|
|
||||||
|
Strings args = { "ssh", host.c_str(), "-x", "-a" };
|
||||||
|
if (!keyFile.empty())
|
||||||
|
args.insert(args.end(), {"-i", keyFile});
|
||||||
|
if (compress)
|
||||||
|
args.push_back("-C");
|
||||||
|
if (useMaster)
|
||||||
|
args.insert(args.end(), {"-S", socketPath});
|
||||||
|
args.push_back(command);
|
||||||
|
execvp(args.begin()->c_str(), stringsToCharPtrs(args).data());
|
||||||
|
|
||||||
|
throw SysError("executing ‘%s’ on ‘%s’", command, host);
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
|
in.readSide = -1;
|
||||||
|
out.writeSide = -1;
|
||||||
|
|
||||||
|
conn->out = std::move(out.readSide);
|
||||||
|
conn->in = std::move(in.writeSide);
|
||||||
|
|
||||||
|
return conn;
|
||||||
|
}
|
||||||
|
|
||||||
|
void SSHMaster::startMaster()
|
||||||
|
{
|
||||||
|
if (!useMaster || sshMaster != -1) return;
|
||||||
|
|
||||||
|
tmpDir = std::make_unique<AutoDelete>(createTempDir("", "nix", true, true, 0700));
|
||||||
|
|
||||||
|
socketPath = (Path) *tmpDir + "/ssh.sock";
|
||||||
|
|
||||||
|
Pipe out;
|
||||||
|
out.create();
|
||||||
|
|
||||||
|
sshMaster = startProcess([&]() {
|
||||||
|
restoreSignals();
|
||||||
|
|
||||||
|
close(out.readSide.get());
|
||||||
|
|
||||||
|
if (dup2(out.writeSide.get(), STDOUT_FILENO) == -1)
|
||||||
|
throw SysError("duping over stdout");
|
||||||
|
|
||||||
|
Strings args =
|
||||||
|
{ "ssh", host.c_str(), "-M", "-N", "-S", socketPath
|
||||||
|
, "-o", "LocalCommand=echo started"
|
||||||
|
, "-o", "PermitLocalCommand=yes"
|
||||||
|
};
|
||||||
|
if (!keyFile.empty())
|
||||||
|
args.insert(args.end(), {"-i", keyFile});
|
||||||
|
if (compress)
|
||||||
|
args.push_back("-C");
|
||||||
|
|
||||||
|
execvp(args.begin()->c_str(), stringsToCharPtrs(args).data());
|
||||||
|
|
||||||
|
throw SysError("starting SSH master");
|
||||||
|
});
|
||||||
|
|
||||||
|
out.writeSide = -1;
|
||||||
|
|
||||||
|
std::string reply;
|
||||||
|
try {
|
||||||
|
reply = readLine(out.readSide.get());
|
||||||
|
} catch (EndOfFile & e) { }
|
||||||
|
|
||||||
|
if (reply != "started")
|
||||||
|
throw Error("failed to start SSH master connection to ‘%s’", host);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
41
src/libstore/ssh.hh
Normal file
41
src/libstore/ssh.hh
Normal file
|
@ -0,0 +1,41 @@
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include "util.hh"
|
||||||
|
|
||||||
|
namespace nix {
|
||||||
|
|
||||||
|
class SSHMaster
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
|
||||||
|
std::string host;
|
||||||
|
std::string keyFile;
|
||||||
|
bool useMaster;
|
||||||
|
bool compress;
|
||||||
|
Pid sshMaster;
|
||||||
|
std::unique_ptr<AutoDelete> tmpDir;
|
||||||
|
Path socketPath;
|
||||||
|
|
||||||
|
public:
|
||||||
|
|
||||||
|
SSHMaster(const std::string & host, const std::string & keyFile, bool useMaster, bool compress)
|
||||||
|
: host(host)
|
||||||
|
, keyFile(keyFile)
|
||||||
|
, useMaster(useMaster)
|
||||||
|
, compress(compress)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Connection
|
||||||
|
{
|
||||||
|
Pid sshPid;
|
||||||
|
AutoCloseFD out, in;
|
||||||
|
};
|
||||||
|
|
||||||
|
std::unique_ptr<Connection> startCommand(const std::string & command);
|
||||||
|
|
||||||
|
void startMaster();
|
||||||
|
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
|
@ -141,11 +141,16 @@ public:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
unsigned int count()
|
size_t count()
|
||||||
{
|
{
|
||||||
auto state_(state.lock());
|
auto state_(state.lock());
|
||||||
return state_->idle.size() + state_->inUse;
|
return state_->idle.size() + state_->inUse;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
size_t capacity()
|
||||||
|
{
|
||||||
|
return state.lock()->max;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue