download-from-binary-cache: parallelise fetching of NAR info files

Getting substitute information using the binary cache substituter has
non-trivial latency overhead.  A package or NixOS system configuration
can have hundreds of dependencies, and in the worst case (when the
local info cache is empty) we have to do a separate HTTP request for
each of these.  If the ping time to the server is t, getting N info
files will take tN seconds; e.g., with a ping time of 0.1s to
nixos.org, sequentially downloading 1000 info files (a typical NixOS
config) will take at least 100 seconds.

To fix this problem, the binary cache substituter can now perform
requests in parallel.  This required changing the substituter
interface to support a function querySubstitutablePathInfos() that
queries multiple paths at the same time, and rewriting queryMissing()
to take advantage of parallelism.  (Due to local caching,
parallelising queryMissing() is sufficient for most use cases, since
it's almost always called before building a derivation and thus fills
the local info cache.)

For example, parallelism speeds up querying all 1056 paths in a
particular NixOS system configuration from 116s to 2.6s.  It works so
well because the eccentricity of the top-level derivation in the
dependency graph is only 9.  So we only need 10 round-trips (when
using an unlimited number of parallel connections) to get everything.

Currently we do a maximum of 150 parallel connections to the server.
Thus it's important that the binary cache server (e.g. nixos.org) has
a high connection limit.  Alternatively we could use HTTP pipelining,
but WWW::Curl doesn't support it and libcurl has a hard-coded limit of
5 requests per pipeline.
This commit is contained in:
Eelco Dolstra 2012-07-06 19:08:20 -04:00
parent cd94665f38
commit 11800e6198
9 changed files with 285 additions and 134 deletions

View file

@ -12,33 +12,40 @@ use strict;
my @binaryCacheUrls = map { s/\/+$//; $_ } split(/ /, ($ENV{"NIX_BINARY_CACHES"} || ""));
my $maxParallelRequests = 150;
my ($dbh, $insertNAR, $queryNAR, $insertNegativeNAR, $queryNegativeNAR);
my %cacheIds;
my $curlm = WWW::Curl::Multi->new;
my $activeRequests = 0;
my $curlIdCount = 1;
my %curlHandles;
my %requests;
my %scheduled;
my $caBundle = $ENV{"CURL_CA_BUNDLE"} || $ENV{"OPENSSL_X509_CERT_FILE"};
sub addRequest {
my ($url) = @_;
my ($storePath, $url) = @_;
my $curl = WWW::Curl::Easy->new;
my $curlId = $curlIdCount++;
$curlHandles{$curlId} = { handle => $curl, content => "" };
$requests{$curlId} = { storePath => $storePath, url => $url, handle => $curl, content => "" };
$curl->setopt(CURLOPT_PRIVATE, $curlId);
$curl->setopt(CURLOPT_URL, $url);
$curl->setopt(CURLOPT_WRITEDATA, \$curlHandles{$curlId}->{content});
$curl->setopt(CURLOPT_WRITEDATA, \$requests{$curlId}->{content});
$curl->setopt(CURLOPT_FOLLOWLOCATION, 1);
$curl->setopt(CURLOPT_CAINFO, $caBundle) if defined $caBundle;
if ($activeRequests >= $maxParallelRequests) {
$scheduled{$curlId} = 1;
} else {
$curlm->add_handle($curl);
$activeRequests++;
}
return $curlHandles{$curlId};
return $requests{$curlId};
}
@ -55,12 +62,20 @@ sub processRequests {
if ($curlm->perform() != $activeRequests) {
while (my ($id, $result) = $curlm->info_read) {
if ($id) {
my $handle = $curlHandles{$id}->{handle};
$curlHandles{$id}->{result} = $result;
$curlHandles{$id}->{httpStatus} = $handle->getinfo(CURLINFO_HTTP_CODE);
#print STDERR "\nRequest completed ($id, $result, $curlHandles{$id}->{httpStatus})\n";
my $handle = $requests{$id}->{handle};
$requests{$id}->{result} = $result;
$requests{$id}->{httpStatus} = $handle->getinfo(CURLINFO_HTTP_CODE);
#print STDERR "\nRequest completed ($id, $result, $requests{$id}->{httpStatus})\n";
$activeRequests--;
delete $curlHandles{$id}->{handle};
delete $requests{$id}->{handle};
if (scalar(keys %scheduled) > 0) {
my $id2 = (keys %scheduled)[0];
$curlm->add_handle($requests{$id2}->{handle});
$activeRequests++;
delete $scheduled{$id2};
}
}
}
}
@ -130,23 +145,21 @@ EOF
}
sub getInfoFrom {
my ($storePath, $pathHash, $binaryCacheUrl) = @_;
sub negativeHit {
my ($storePath, $binaryCacheUrl) = @_;
$queryNegativeNAR->execute(getCacheId($binaryCacheUrl), basename($storePath));
return @{$queryNegativeNAR->fetchall_arrayref()} != 0;
}
sub processNARInfo {
my ($storePath, $binaryCacheUrl, $request) = @_;
my $cacheId = getCacheId($binaryCacheUrl);
# Bail out if there is a negative cache entry.
$queryNegativeNAR->execute($cacheId, basename($storePath));
return undef if @{$queryNegativeNAR->fetchall_arrayref()} != 0;
my $infoUrl = "$binaryCacheUrl/$pathHash.narinfo";
print STDERR "checking $infoUrl...\n";
my $request = addRequest($infoUrl);
processRequests;
if ($request->{result} != 0 || $request->{httpStatus} != 200) {
if ($request->{httpStatus} != 404) {
print STDERR "could not download $infoUrl (" .
print STDERR "could not download $request->{url} (" .
($request->{result} != 0 ? "Curl error $request->{result}" : "HTTP status $request->{httpStatus}") . ")\n";
} else {
$insertNegativeNAR->execute($cacheId, basename($storePath), time());
@ -172,7 +185,7 @@ sub getInfoFrom {
}
return undef if $storePath ne $storePath2;
if ($storePath ne $storePath2 || !defined $url || !defined $narHash) {
print STDERR "bad NAR info file $infoUrl\n";
print STDERR "bad NAR info file $request->{url}\n";
return undef;
}
@ -236,24 +249,65 @@ sub cachedGetInfoFrom {
}
sub getInfo {
my ($storePath) = @_;
sub printInfo {
my ($storePath, $info) = @_;
print "$storePath\n";
print $info->{deriver} ? "$Nix::Config::storeDir/$info->{deriver}" : "", "\n";
print scalar @{$info->{refs}}, "\n";
print "$Nix::Config::storeDir/$_\n" foreach @{$info->{refs}};
print $info->{fileSize} || 0, "\n";
print $info->{narSize} || 0, "\n";
}
sub printInfoParallel {
my @paths = @_;
# First print all paths for which we have cached info.
my @left;
foreach my $storePath (@paths) {
my $pathHash = substr(basename($storePath), 0, 32);
# First look if we have cached info for one of the URLs.
my $found = 0;
foreach my $binaryCacheUrl (@binaryCacheUrls) {
my $info = cachedGetInfoFrom($storePath, $pathHash, $binaryCacheUrl);
return $info if defined $info;
if (defined $info) {
printInfo($storePath, $info);
$found = 1;
last;
}
}
push @left, $storePath if !$found;
}
# No, so do an HTTP request until we get a hit.
return if scalar @left == 0;
foreach my $binaryCacheUrl (@binaryCacheUrls) {
my $info = getInfoFrom($storePath, $pathHash, $binaryCacheUrl);
return $info if defined $info;
my @left2;
%requests = ();
foreach my $storePath (@left) {
my $pathHash = substr(basename($storePath), 0, 32);
if (negativeHit($storePath, $binaryCacheUrl)) {
push @left2, $storePath;
next;
}
my $infoUrl = "$binaryCacheUrl/$pathHash.narinfo";
addRequest($storePath, $infoUrl);
}
return undef;
processRequests;
foreach my $request (values %requests) {
my $info = processNARInfo($request->{storePath}, $binaryCacheUrl, $request);
if (defined $info) {
printInfo($request->{storePath}, $info);
} else {
push @left2, $request->{storePath};
}
}
@left = @left2;
}
}
@ -264,8 +318,16 @@ sub downloadBinary {
cache: foreach my $binaryCacheUrl (@binaryCacheUrls) {
my $info = cachedGetInfoFrom($storePath, $pathHash, $binaryCacheUrl);
$info = getInfoFrom($storePath, $pathHash, $binaryCacheUrl) unless defined $info;
if (defined $info) {
unless (defined $info) {
next if negativeHit($storePath, $binaryCacheUrl);
my $request = addRequest($storePath, "$binaryCacheUrl/$pathHash.narinfo");
processRequests;
$info = processNARInfo($storePath, $binaryCacheUrl, $request);
}
next unless defined $info;
my $decompressor;
if ($info->{compression} eq "bzip2") { $decompressor = "$Nix::Config::bzip2 -d"; }
elsif ($info->{compression} eq "xz") { $decompressor = "$Nix::Config::xz -d"; }
@ -288,7 +350,6 @@ sub downloadBinary {
print STDERR "\n";
return 1;
}
}
return 0;
}
@ -300,29 +361,20 @@ initCache();
if ($ARGV[0] eq "--query") {
while (<STDIN>) {
my $cmd = $_; chomp $cmd;
chomp;
my ($cmd, @args) = split " ", $_;
if ($cmd eq "have") {
my $storePath = <STDIN>; chomp $storePath;
# FIXME: want to give correct info here, but it's too slow.
#print "0\n";
my $info = getInfo($storePath);
if (defined $info) { print "1\n"; } else { print "0\n"; }
print "0\n";
#my $info = getInfo($storePath);
#if (defined $info) { print "1\n"; } else { print "0\n"; }
}
elsif ($cmd eq "info") {
my $storePath = <STDIN>; chomp $storePath;
my $info = getInfo($storePath);
if (defined $info) {
print "1\n";
print $info->{deriver} ? "$Nix::Config::storeDir/$info->{deriver}" : "", "\n";
print scalar @{$info->{refs}}, "\n";
print "$Nix::Config::storeDir/$_\n" foreach @{$info->{refs}};
print $info->{fileSize} || 0, "\n";
print $info->{narSize} || 0, "\n";
} else {
print "0\n";
}
printInfoParallel(@args);
print "\n";
}
else { die "unknown command `$cmd'"; }

View file

@ -2352,10 +2352,12 @@ void SubstitutionGoal::tryNext()
sub = subs.front();
subs.pop_front();
if (!worker.store.querySubstitutablePathInfo(sub, storePath, info)) {
tryNext();
return;
}
SubstitutablePathInfos infos;
PathSet dummy(singleton<PathSet>(storePath));
worker.store.querySubstitutablePathInfos(sub, dummy, infos);
SubstitutablePathInfos::iterator k = infos.find(storePath);
if (k == infos.end()) { tryNext(); return; }
info = k->second;
/* To maintain the closure invariant, we first have to realise the
paths referenced by this one. */

View file

@ -155,8 +155,9 @@ void setDefaultsFromEnvironment()
string subs = getEnv("NIX_SUBSTITUTERS", "default");
if (subs == "default") {
substituters.push_back(nixLibexecDir + "/nix/substituters/copy-from-other-stores.pl");
substituters.push_back(nixLibexecDir + "/nix/substituters/download-using-manifests.pl");
//substituters.push_back(nixLibexecDir + "/nix/substituters/copy-from-other-stores.pl");
//substituters.push_back(nixLibexecDir + "/nix/substituters/download-using-manifests.pl");
substituters.push_back(nixLibexecDir + "/nix/substituters/download-from-binary-cache.pl");
} else
substituters = tokenizeString(subs, ":");

View file

@ -936,16 +936,23 @@ bool LocalStore::hasSubstitutes(const Path & path)
}
bool LocalStore::querySubstitutablePathInfo(const Path & substituter,
const Path & path, SubstitutablePathInfo & info)
void LocalStore::querySubstitutablePathInfos(const Path & substituter,
PathSet & paths, SubstitutablePathInfos & infos)
{
RunningSubstituter & run(runningSubstituters[substituter]);
startSubstituter(substituter, run);
writeLine(run.to, "info\n" + path);
if (!getIntLine<int>(run.from)) return false;
string s = "info ";
foreach (PathSet::const_iterator, i, paths)
if (infos.find(*i) == infos.end()) { s += *i; s += " "; }
writeLine(run.to, s);
while (true) {
Path path = readLine(run.from);
if (path == "") break;
assert(paths.find(path) != paths.end());
paths.erase(path);
SubstitutablePathInfo & info(infos[path]);
info.deriver = readLine(run.from);
if (info.deriver != "") assertStorePath(info.deriver);
int nrRefs = getIntLine<int>(run.from);
@ -956,17 +963,30 @@ bool LocalStore::querySubstitutablePathInfo(const Path & substituter,
}
info.downloadSize = getIntLine<long long>(run.from);
info.narSize = getIntLine<long long>(run.from);
return true;
}
}
bool LocalStore::querySubstitutablePathInfo(const Path & path,
SubstitutablePathInfo & info)
{
foreach (Paths::iterator, i, substituters)
if (querySubstitutablePathInfo(*i, path, info)) return true;
return false;
SubstitutablePathInfos infos;
querySubstitutablePathInfos(singleton<PathSet>(path), infos);
SubstitutablePathInfos::iterator i = infos.find(path);
if (i == infos.end()) return false;
info = i->second;
return true;
}
void LocalStore::querySubstitutablePathInfos(const PathSet & paths,
SubstitutablePathInfos & infos)
{
PathSet todo = paths;
foreach (Paths::iterator, i, substituters) {
if (todo.empty()) break;
querySubstitutablePathInfos(*i, todo, infos);
}
}

View file

@ -128,8 +128,11 @@ public:
bool querySubstitutablePathInfo(const Path & path,
SubstitutablePathInfo & info);
bool querySubstitutablePathInfo(const Path & substituter,
const Path & path, SubstitutablePathInfo & info);
void querySubstitutablePathInfos(const Path & substituter,
PathSet & paths, SubstitutablePathInfos & infos);
void querySubstitutablePathInfos(const PathSet & paths,
SubstitutablePathInfos & infos);
Path addToStore(const Path & srcPath,
bool recursive = true, HashType hashAlgo = htSHA256,

View file

@ -55,45 +55,97 @@ void queryMissing(StoreAPI & store, const PathSet & targets,
PathSet todo(targets.begin(), targets.end()), done;
while (!todo.empty()) {
Path p = *(todo.begin());
todo.erase(p);
if (done.find(p) != done.end()) continue;
done.insert(p);
bool useSubstitutes = queryBoolSetting("build-use-substitutes", true);
if (isDerivation(p)) {
if (!store.isValidPath(p)) {
unknown.insert(p);
/* Getting substitute info has high latency when using the binary
cache substituter. Thus it's essential to do substitute
queries in parallel as much as possible. To accomplish this
we do the following:
- For all paths still to be processed (todo), we add all
paths for which we need info to the set query. For an
unbuilt derivation this is the output paths; otherwise, it's
the path itself.
- We get info about all paths in query in parallel.
- We process the results and add new items to todo if
necessary. E.g. if a path is substitutable, then we need to
get info on its references.
- Repeat until todo is empty.
*/
while (!todo.empty()) {
PathSet query, todoDrv, todoNonDrv;
foreach (PathSet::iterator, i, todo) {
if (done.find(*i) != done.end()) continue;
done.insert(*i);
if (isDerivation(*i)) {
if (!store.isValidPath(*i)) {
// FIXME: we could try to substitute p.
unknown.insert(*i);
continue;
}
Derivation drv = derivationFromPath(store, p);
Derivation drv = derivationFromPath(store, *i);
PathSet invalid;
foreach (DerivationOutputs::iterator, j, drv.outputs)
if (!store.isValidPath(j->second.path)) invalid.insert(j->second.path);
if (invalid.empty()) continue;
todoDrv.insert(*i);
if (useSubstitutes) query.insert(invalid.begin(), invalid.end());
}
else {
if (store.isValidPath(*i)) continue;
query.insert(*i);
todoNonDrv.insert(*i);
}
}
todo.clear();
SubstitutablePathInfos infos;
store.querySubstitutablePathInfos(query, infos);
foreach (PathSet::iterator, i, todoDrv) {
// FIXME: cache this
Derivation drv = derivationFromPath(store, *i);
bool mustBuild = false;
foreach (DerivationOutputs::iterator, i, drv.outputs)
if (!store.isValidPath(i->second.path) &&
!(queryBoolSetting("build-use-substitutes", true) && store.hasSubstitutes(i->second.path)))
if (useSubstitutes) {
foreach (DerivationOutputs::iterator, j, drv.outputs)
if (!store.isValidPath(j->second.path) &&
infos.find(j->second.path) == infos.end())
mustBuild = true;
} else
mustBuild = true;
if (mustBuild) {
willBuild.insert(p);
willBuild.insert(*i);
todo.insert(drv.inputSrcs.begin(), drv.inputSrcs.end());
foreach (DerivationInputs::iterator, i, drv.inputDrvs)
todo.insert(i->first);
} else
foreach (DerivationOutputs::iterator, i, drv.outputs)
todo.insert(i->second.path);
todoNonDrv.insert(i->second.path);
}
else {
if (store.isValidPath(p)) continue;
SubstitutablePathInfo info;
if (store.querySubstitutablePathInfo(p, info)) {
willSubstitute.insert(p);
downloadSize += info.downloadSize;
narSize += info.narSize;
todo.insert(info.references.begin(), info.references.end());
foreach (PathSet::iterator, i, todoNonDrv) {
done.insert(*i);
SubstitutablePathInfos::iterator info = infos.find(*i);
if (info != infos.end()) {
willSubstitute.insert(*i);
downloadSize += info->second.downloadSize;
narSize += info->second.narSize;
todo.insert(info->second.references.begin(), info->second.references.end());
} else
unknown.insert(p);
unknown.insert(*i);
}
}
}

View file

@ -256,6 +256,19 @@ bool RemoteStore::querySubstitutablePathInfo(const Path & path,
}
void RemoteStore::querySubstitutablePathInfos(const PathSet & paths,
SubstitutablePathInfos & infos)
{
if (paths.empty()) return;
printMsg(lvlError, format("QUERYING %1% (REMOTE)") % showPaths(paths));
foreach (PathSet::const_iterator, i, paths) {
SubstitutablePathInfo info;
if (querySubstitutablePathInfo(*i, info))
infos[*i] = info;
}
}
ValidPathInfo RemoteStore::queryPathInfo(const Path & path)
{
openConnection();

View file

@ -48,6 +48,9 @@ public:
bool querySubstitutablePathInfo(const Path & path,
SubstitutablePathInfo & info);
void querySubstitutablePathInfos(const PathSet & paths,
SubstitutablePathInfos & infos);
Path addToStore(const Path & srcPath,
bool recursive = true, HashType hashAlgo = htSHA256,
PathFilter & filter = defaultPathFilter);

View file

@ -86,6 +86,8 @@ struct SubstitutablePathInfo
unsigned long long narSize; /* 0 = unknown */
};
typedef std::map<Path, SubstitutablePathInfo> SubstitutablePathInfos;
struct ValidPathInfo
{
@ -148,6 +150,9 @@ public:
virtual bool querySubstitutablePathInfo(const Path & path,
SubstitutablePathInfo & info) = 0;
virtual void querySubstitutablePathInfos(const PathSet & paths,
SubstitutablePathInfos & infos) = 0;
/* Copy the contents of a path to the store and register the
validity the resulting path. The resulting path is returned.
The function object `filter' can be used to exclude files (see