nix-copy-closure: Restore compression and the progress viewer

This commit is contained in:
Eelco Dolstra 2014-07-10 14:15:12 +02:00
parent 7911e4c27a
commit 1114c7bd57
3 changed files with 89 additions and 16 deletions

View file

@ -15,6 +15,16 @@ sub readInt {
} }
sub writeString {
my ($s, $to) = @_;
my $len = length $s;
my $req .= pack("L<x4", $len);
$req .= $s;
$req .= "\000" x (8 - $len % 8) if $len % 8;
syswrite($to, $req) or die;
}
sub copyTo { sub copyTo {
my ($sshHost, $sshOpts, $storePaths, $compressor, $decompressor, my ($sshHost, $sshOpts, $storePaths, $compressor, $decompressor,
$includeOutputs, $dryRun, $sign, $progressViewer, $useSubstitutes) = @_; $includeOutputs, $dryRun, $sign, $progressViewer, $useSubstitutes) = @_;
@ -49,16 +59,10 @@ sub copyTo {
} }
# Send the "query valid paths" command with the "lock" option # Send the "query valid paths" command with the "lock" option
# enabled. This prevens a race where the remote host # enabled. This prevents a race where the remote host
# garbage-collect paths that are already there. # garbage-collect paths that are already there.
my $req = pack("L<x4L<x4L<x4", 1, 1, scalar @closure); syswrite($to, pack("L<x4L<x4L<x4", 1, 1, scalar @closure)) or die;
for my $s (@closure) { writeString($_, $to) foreach @closure;
my $len = length $s;
$req .= pack("L<x4", $len);
$req .= $s;
$req .= "\000" x (8 - $len % 8) if $len % 8;
}
syswrite($to, $req) or die;
# Get back the set of paths that are already valid on the remote host. # Get back the set of paths that are already valid on the remote host.
my %present; my %present;
@ -76,11 +80,35 @@ sub copyTo {
# Send the "import paths" command. # Send the "import paths" command.
syswrite($to, pack("L<x4", 4)) or die; syswrite($to, pack("L<x4", 4)) or die;
writeString($compressor, $to);
if ($compressor || $progressViewer) {
# Compute the size of the closure for the progress viewer.
if ($progressViewer) {
my $missingSize = 0;
$missingSize += (queryPathInfo($_, 1))[3] foreach @missing;
$progressViewer = "$progressViewer -s $missingSize";
}
# Start the compressor and/or progress viewer in between us
# and the remote host.
my $to_;
my $pid2 = open2(">&" . fileno($to), $to_,
$progressViewer && $compressor ? "$progressViewer | $compressor" : $progressViewer || $compressor);
close $to;
exportPaths(fileno($to_), $sign, @missing);
close $to_;
waitpid $pid2, 0;
} else {
exportPaths(fileno($to), $sign, @missing); exportPaths(fileno($to), $sign, @missing);
readInt($from) == 1 or die; close $to;
}
readInt($from) == 1 or die "remote machine \`$sshHost' failed to import closure\n";
# Shut down the server process. # Shut down the server process.
close $to;
waitpid $pid, 0; waitpid $pid, 0;
} }

View file

@ -42,11 +42,11 @@ while (@ARGV) {
} }
elsif ($arg eq "--gzip") { elsif ($arg eq "--gzip") {
$compressor = "gzip"; $compressor = "gzip";
$decompressor = "gunzip"; $decompressor = "gzip -d";
} }
elsif ($arg eq "--bzip2") { elsif ($arg eq "--bzip2") {
$compressor = "bzip2"; $compressor = "bzip2";
$decompressor = "bunzip2"; $decompressor = "bzip2 -d";
} }
elsif ($arg eq "--xz") { elsif ($arg eq "--xz") {
$compressor = "xz"; $compressor = "xz";

View file

@ -928,12 +928,57 @@ static void opServe(Strings opFlags, Strings opArgs)
dumpPath(readStorePath(in), out); dumpPath(readStorePath(in), out);
out.flush(); out.flush();
break; break;
case cmdImportPaths: case cmdImportPaths: {
if (!writeAllowed) throw Error("importing paths not allowed"); if (!writeAllowed) throw Error("importing paths not allowed");
string compression = readString(in);
if (compression != "") {
if (compression != "gzip" && compression != "bzip2" && compression != "xz")
throw Error(format("unsupported compression method `%1%'") % compression);
Pipe fromDecompressor;
fromDecompressor.create();
Pid pid;
pid = fork();
switch (pid) {
case -1:
throw SysError("unable to fork");
case 0: /* child */
try {
fromDecompressor.readSide.close();
if (dup2(fromDecompressor.writeSide, STDOUT_FILENO) == -1)
throw SysError("dupping stdout");
// FIXME: use absolute path.
execlp(compression.c_str(), compression.c_str(), "-d", NULL);
throw SysError(format("executing `%1%'") % compression);
} catch (std::exception & e) {
std::cerr << "error: " << e.what() << std::endl;
}
_exit(1);
}
fromDecompressor.writeSide.close();
FdSource fromDecompressor_(fromDecompressor.readSide);
store->importPaths(false, fromDecompressor_);
pid.wait(true);
} else
store->importPaths(false, in); store->importPaths(false, in);
writeInt(1, out); // indicate success writeInt(1, out); // indicate success
out.flush(); out.flush();
/* The decompressor will have left stdin in an
undefined state, so we can't continue. */
if (compression != "") return;
break; break;
}
default: default:
throw Error(format("unknown serve command %1%") % cmd); throw Error(format("unknown serve command %1%") % cmd);
} }