nix-copy-closure: Fix race condition

There is a long-standing race condition when copying a closure to a
remote machine, particularly affecting build-remote.pl: the client
first asks the remote machine which paths it already has, then copies
over the missing paths. If the garbage collector kicks in on the
remote machine between the first and second step, the already-present
paths may be deleted. The missing paths may then refer to deleted
paths, causing nix-copy-closure to fail. The client now performs both
steps using a single remote Nix call (using ‘nix-store --serve’),
locking all paths in the closure while querying.

I changed the --serve protocol a bit (getting rid of QueryCommand), so
this breaks the SSH substituter from older versions. But it was marked
experimental anyway.

Fixes #141.
This commit is contained in:
Eelco Dolstra 2014-07-10 11:51:22 +02:00
parent 2c3a8f787b
commit 04170d06bf
4 changed files with 137 additions and 61 deletions

View file

@ -4,6 +4,15 @@ use strict;
use Nix::Config; use Nix::Config;
use Nix::Store; use Nix::Store;
use List::Util qw(sum); use List::Util qw(sum);
use IPC::Open2;
sub readInt {
my ($from) = @_;
my $resp;
sysread($from, $resp, 8) == 8 or die "did not receive valid reply from remote host\n";
return unpack("L<x4", $resp);
}
sub copyTo { sub copyTo {
@ -20,14 +29,76 @@ sub copyTo {
# Ignore exit status because this is just an optimisation. # Ignore exit status because this is just an optimisation.
} }
# Start nix-store --serve on the remote host.
my ($from, $to);
my $pid = open2($from, $to, "ssh $sshHost @{$sshOpts} nix-store --serve --write");
# Do the handshake.
eval {
my $SERVE_MAGIC_1 = 0x390c9deb; # FIXME
my $clientVersion = 0x200;
syswrite($to, pack("L<x4L<x4", $SERVE_MAGIC_1, $clientVersion)) or die;
die "did not get valid handshake from remote host\n" if readInt($from) != 0x5452eecb;
my $serverVersion = readInt($from);
die "unsupported server version\n" if $serverVersion < 0x200 || $serverVersion >= 0x300;
};
if ($@) {
chomp $@;
warn "$@; falling back to old closure copying method\n";
return oldCopyTo(\@closure, @_);
}
# Send the "query valid paths" command with the "lock" option
# enabled. This prevens a race where the remote host
# garbage-collect paths that are already there.
my $req = pack("L<x4L<x4L<x4", 1, 1, scalar @closure);
for my $s (@closure) {
my $len = length $s;
$req .= pack("L<x4", $len);
$req .= $s;
$req .= "\000" x (8 - $len % 8) if $len % 8;
}
syswrite($to, $req) or die;
# Get back the set of paths that are already valid on the remote host.
my %present;
my $n = readInt($from);
while ($n--) {
my $len = readInt($from);
my $s;
sysread($from, $s, $len) == $len or die;
$present{$s} = 1;
sysread($from, $s, 8 - $len % 8) if $len % 8; # skip padding
}
my @missing = grep { !$present{$_} } @closure;
return if !@missing;
# Send the "import paths" command.
syswrite($to, pack("L<x4", 4)) or die;
exportPaths(fileno($to), $sign, @missing);
readInt($from) == 1 or die;
# Shut down the server process.
close $to;
waitpid $pid, 0;
}
# For backwards compatibility with Nix <= 1.7. Will be removed
# eventually.
sub oldCopyTo {
my ($closure, $sshHost, $sshOpts, $storePaths, $compressor, $decompressor,
$includeOutputs, $dryRun, $sign, $progressViewer, $useSubstitutes) = @_;
# Ask the remote host which paths are invalid. Because of limits # Ask the remote host which paths are invalid. Because of limits
# to the command line length, do this in chunks. Eventually, # to the command line length, do this in chunks. Eventually,
# we'll want to use --from-stdin, but we can't rely on the # we'll want to use --from-stdin, but we can't rely on the
# target having this option yet. # target having this option yet.
my @missing = (); my @missing;
my $missingSize = 0; my $missingSize = 0;
while (scalar(@closure) > 0) { while (scalar(@$closure) > 0) {
my @ps = splice(@closure, 0, 1500); my @ps = splice(@$closure, 0, 1500);
open(READ, "set -f; ssh $sshHost @{$sshOpts} nix-store --check-validity --print-invalid @ps|"); open(READ, "set -f; ssh $sshHost @{$sshOpts} nix-store --check-validity --print-invalid @ps|");
while (<READ>) { while (<READ>) {
chomp; chomp;

View file

@ -58,7 +58,7 @@ static std::pair<FdSink, FdSource> connect(const string & conn)
static void substitute(std::pair<FdSink, FdSource> & pipes, Path storePath, Path destPath) static void substitute(std::pair<FdSink, FdSource> & pipes, Path storePath, Path destPath)
{ {
writeInt(cmdSubstitute, pipes.first); writeInt(cmdDumpStorePath, pipes.first);
writeString(storePath, pipes.first); writeString(storePath, pipes.first);
pipes.first.flush(); pipes.first.flush();
restorePath(destPath, pipes.second); restorePath(destPath, pipes.second);
@ -68,20 +68,20 @@ static void substitute(std::pair<FdSink, FdSource> & pipes, Path storePath, Path
static void query(std::pair<FdSink, FdSource> & pipes) static void query(std::pair<FdSink, FdSource> & pipes)
{ {
writeInt(cmdQuery, pipes.first);
for (string line; getline(std::cin, line);) { for (string line; getline(std::cin, line);) {
Strings tokenized = tokenizeString<Strings>(line); Strings tokenized = tokenizeString<Strings>(line);
string cmd = tokenized.front(); string cmd = tokenized.front();
tokenized.pop_front(); tokenized.pop_front();
if (cmd == "have") { if (cmd == "have") {
writeInt(qCmdHave, pipes.first); writeInt(cmdQueryValidPaths, pipes.first);
writeInt(0, pipes.first); // don't lock
writeStrings(tokenized, pipes.first); writeStrings(tokenized, pipes.first);
pipes.first.flush(); pipes.first.flush();
PathSet paths = readStrings<PathSet>(pipes.second); PathSet paths = readStrings<PathSet>(pipes.second);
foreach (PathSet::iterator, i, paths) foreach (PathSet::iterator, i, paths)
std::cout << *i << std::endl; std::cout << *i << std::endl;
} else if (cmd == "info") { } else if (cmd == "info") {
writeInt(qCmdInfo, pipes.first); writeInt(cmdQueryPathInfos, pipes.first);
writeStrings(tokenized, pipes.first); writeStrings(tokenized, pipes.first);
pipes.first.flush(); pipes.first.flush();
while (1) { while (1) {

View file

@ -869,8 +869,12 @@ static void opClearFailedPaths(Strings opFlags, Strings opArgs)
/* Serve the nix store in a way usable by a restricted ssh user. */ /* Serve the nix store in a way usable by a restricted ssh user. */
static void opServe(Strings opFlags, Strings opArgs) static void opServe(Strings opFlags, Strings opArgs)
{ {
if (!opArgs.empty() || !opFlags.empty()) bool writeAllowed = false;
throw UsageError("no arguments or flags expected"); foreach (Strings::iterator, i, opFlags)
if (*i == "--write") writeAllowed = true;
else throw UsageError(format("unknown flag `%1%'") % *i);
if (!opArgs.empty()) throw UsageError("no arguments expected");
FdSource in(STDIN_FILENO); FdSource in(STDIN_FILENO);
FdSink out(STDOUT_FILENO); FdSink out(STDOUT_FILENO);
@ -883,50 +887,56 @@ static void opServe(Strings opFlags, Strings opArgs)
out.flush(); out.flush();
readInt(in); // Client version, unused for now readInt(in); // Client version, unused for now
ServeCommand cmd = (ServeCommand) readInt(in); while (true) {
switch (cmd) { ServeCommand cmd;
case cmdQuery: try {
while (true) { cmd = (ServeCommand) readInt(in);
QueryCommand qCmd; } catch (EndOfFile & e) {
try { break;
qCmd = (QueryCommand) readInt(in); }
} catch (EndOfFile & e) {
break; switch (cmd) {
} case cmdQueryValidPaths: {
switch (qCmd) { bool lock = readInt(in);
case qCmdHave: { PathSet paths = readStorePaths<PathSet>(in);
PathSet paths = readStorePaths<PathSet>(in); if (lock && writeAllowed)
writeStrings(store->queryValidPaths(paths), out); for (auto & path : paths)
break; store->addTempRoot(path);
} writeStrings(store->queryValidPaths(paths), out);
case qCmdInfo: {
PathSet paths = readStorePaths<PathSet>(in);
// !!! Maybe we want a queryPathInfos?
foreach (PathSet::iterator, i, paths) {
if (!store->isValidPath(*i))
continue;
ValidPathInfo info = store->queryPathInfo(*i);
writeString(info.path, out);
writeString(info.deriver, out);
writeStrings(info.references, out);
// !!! Maybe we want compression?
writeLongLong(info.narSize, out); // downloadSize
writeLongLong(info.narSize, out);
}
writeString("", out);
break;
}
default:
throw Error(format("unknown serve query `%1%'") % cmd);
}
out.flush(); out.flush();
break;
} }
break; case cmdQueryPathInfos: {
case cmdSubstitute: PathSet paths = readStorePaths<PathSet>(in);
dumpPath(readStorePath(in), out); // !!! Maybe we want a queryPathInfos?
break; foreach (PathSet::iterator, i, paths) {
default: if (!store->isValidPath(*i))
throw Error(format("unknown serve command `%1%'") % cmd); continue;
ValidPathInfo info = store->queryPathInfo(*i);
writeString(info.path, out);
writeString(info.deriver, out);
writeStrings(info.references, out);
// !!! Maybe we want compression?
writeLongLong(info.narSize, out); // downloadSize
writeLongLong(info.narSize, out);
}
writeString("", out);
out.flush();
break;
}
case cmdDumpStorePath:
dumpPath(readStorePath(in), out);
out.flush();
break;
case cmdImportPaths:
if (!writeAllowed) throw Error("importing paths not allowed");
store->importPaths(false, in);
writeInt(1, out); // indicate success
out.flush();
break;
default:
throw Error(format("unknown serve command %1%") % cmd);
}
} }
} }

View file

@ -2,23 +2,18 @@
namespace nix { namespace nix {
#define SERVE_MAGIC_1 0x390c9deb #define SERVE_MAGIC_1 0x390c9deb
#define SERVE_MAGIC_2 0x5452eecb #define SERVE_MAGIC_2 0x5452eecb
#define SERVE_PROTOCOL_VERSION 0x101 #define SERVE_PROTOCOL_VERSION 0x200
#define GET_PROTOCOL_MAJOR(x) ((x) & 0xff00) #define GET_PROTOCOL_MAJOR(x) ((x) & 0xff00)
#define GET_PROTOCOL_MINOR(x) ((x) & 0x00ff) #define GET_PROTOCOL_MINOR(x) ((x) & 0x00ff)
typedef enum { typedef enum {
cmdQuery = 0, cmdQueryValidPaths = 1,
cmdSubstitute = 1, cmdQueryPathInfos = 2,
cmdDumpStorePath = 3,
cmdImportPaths = 4,
} ServeCommand; } ServeCommand;
typedef enum {
qCmdHave = 0,
qCmdInfo = 1,
} QueryCommand;
} }