Low-latency closure copy

This adds a new store operation 'addMultipleToStore' that reads a
number of NARs and ValidPathInfos from a Source, allowing any number
of store paths to be copied in a single call. This is much faster on
high-latency links when copying a lot of small files, like .drv
closures.

For example, on a connection with an 50 ms delay:

Before:

  $ nix copy --to 'unix:///tmp/proxy-socket?root=/tmp/dest-chroot' \
    /nix/store/90jjw94xiyg5drj70whm9yll6xjj0ca9-hello-2.10.drv \
    --derivation --no-check-sigs
  real    0m57.868s
  user    0m0.103s
  sys     0m0.056s

After:

  real    0m0.690s
  user    0m0.017s
  sys     0m0.011s
This commit is contained in:
Eelco Dolstra 2021-07-26 13:31:09 +02:00
parent 9957315ce0
commit fe1f34fa60
8 changed files with 162 additions and 90 deletions

View file

@ -243,23 +243,6 @@ struct ClientSettings
} }
}; };
static void writeValidPathInfo(
ref<Store> store,
unsigned int clientVersion,
Sink & to,
std::shared_ptr<const ValidPathInfo> info)
{
to << (info->deriver ? store->printStorePath(*info->deriver) : "")
<< info->narHash.to_string(Base16, false);
worker_proto::write(*store, to, info->references);
to << info->registrationTime << info->narSize;
if (GET_PROTOCOL_MINOR(clientVersion) >= 16) {
to << info->ultimate
<< info->sigs
<< renderContentAddress(info->ca);
}
}
static std::vector<DerivedPath> readDerivedPaths(Store & store, unsigned int clientVersion, Source & from) static std::vector<DerivedPath> readDerivedPaths(Store & store, unsigned int clientVersion, Source & from)
{ {
std::vector<DerivedPath> reqs; std::vector<DerivedPath> reqs;
@ -422,9 +405,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
}(); }();
logger->stopWork(); logger->stopWork();
to << store->printStorePath(pathInfo->path); pathInfo->write(to, *store, GET_PROTOCOL_MINOR(clientVersion));
writeValidPathInfo(store, clientVersion, to, pathInfo);
} else { } else {
HashType hashAlgo; HashType hashAlgo;
std::string baseName; std::string baseName;
@ -471,6 +452,21 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
break; break;
} }
case wopAddMultipleToStore: {
bool repair, dontCheckSigs;
from >> repair >> dontCheckSigs;
if (!trusted && dontCheckSigs)
dontCheckSigs = false;
logger->startWork();
FramedSource source(from);
store->addMultipleToStore(source,
RepairFlag{repair},
dontCheckSigs ? NoCheckSigs : CheckSigs);
logger->stopWork();
break;
}
case wopAddTextToStore: { case wopAddTextToStore: {
string suffix = readString(from); string suffix = readString(from);
string s = readString(from); string s = readString(from);
@ -505,17 +501,6 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
break; break;
} }
case wopImportPaths2: {
logger->startWork();
auto paths = store->importPaths(from,
trusted ? NoCheckSigs : CheckSigs);
logger->stopWork();
Strings paths2;
for (auto & i : paths) paths2.push_back(store->printStorePath(i));
to << paths2;
break;
}
case wopBuildPaths: { case wopBuildPaths: {
auto drvs = readDerivedPaths(*store, clientVersion, from); auto drvs = readDerivedPaths(*store, clientVersion, from);
BuildMode mode = bmNormal; BuildMode mode = bmNormal;
@ -781,7 +766,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
if (info) { if (info) {
if (GET_PROTOCOL_MINOR(clientVersion) >= 17) if (GET_PROTOCOL_MINOR(clientVersion) >= 17)
to << 1; to << 1;
writeValidPathInfo(store, clientVersion, to, info); info->write(to, *store, GET_PROTOCOL_MINOR(clientVersion), false);
} else { } else {
assert(GET_PROTOCOL_MINOR(clientVersion) >= 17); assert(GET_PROTOCOL_MINOR(clientVersion) >= 17);
to << 0; to << 0;

46
src/libstore/path-info.cc Normal file
View file

@ -0,0 +1,46 @@
#include "path-info.hh"
#include "worker-protocol.hh"
namespace nix {
ValidPathInfo ValidPathInfo::read(Source & source, const Store & store, unsigned int format)
{
return read(source, store, format, store.parseStorePath(readString(source)));
}
ValidPathInfo ValidPathInfo::read(Source & source, const Store & store, unsigned int format, StorePath && path)
{
auto deriver = readString(source);
auto narHash = Hash::parseAny(readString(source), htSHA256);
ValidPathInfo info(path, narHash);
if (deriver != "") info.deriver = store.parseStorePath(deriver);
info.references = worker_proto::read(store, source, Phantom<StorePathSet> {});
source >> info.registrationTime >> info.narSize;
if (format >= 16) {
source >> info.ultimate;
info.sigs = readStrings<StringSet>(source);
info.ca = parseContentAddressOpt(readString(source));
}
return info;
}
void ValidPathInfo::write(
Sink & sink,
const Store & store,
unsigned int format,
bool includePath) const
{
if (includePath)
sink << store.printStorePath(path);
sink << (deriver ? store.printStorePath(*deriver) : "")
<< narHash.to_string(Base16, false);
worker_proto::write(store, sink, references);
sink << registrationTime << narSize;
if (format >= 16) {
sink << ultimate
<< sigs
<< renderContentAddress(ca);
}
}
}

View file

@ -105,6 +105,11 @@ struct ValidPathInfo
ValidPathInfo(const StorePath & path, Hash narHash) : path(path), narHash(narHash) { }; ValidPathInfo(const StorePath & path, Hash narHash) : path(path), narHash(narHash) { };
virtual ~ValidPathInfo() { } virtual ~ValidPathInfo() { }
static ValidPathInfo read(Source & source, const Store & store, unsigned int format);
static ValidPathInfo read(Source & source, const Store & store, unsigned int format, StorePath && path);
void write(Sink & sink, const Store & store, unsigned int format, bool includePath = true) const;
}; };
typedef std::map<StorePath, ValidPathInfo> ValidPathInfos; typedef std::map<StorePath, ValidPathInfo> ValidPathInfos;

View file

@ -386,23 +386,6 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathCAMap & pathsMap, S
} }
ref<const ValidPathInfo> RemoteStore::readValidPathInfo(ConnectionHandle & conn, const StorePath & path)
{
auto deriver = readString(conn->from);
auto narHash = Hash::parseAny(readString(conn->from), htSHA256);
auto info = make_ref<ValidPathInfo>(path, narHash);
if (deriver != "") info->deriver = parseStorePath(deriver);
info->references = worker_proto::read(*this, conn->from, Phantom<StorePathSet> {});
conn->from >> info->registrationTime >> info->narSize;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 16) {
conn->from >> info->ultimate;
info->sigs = readStrings<StringSet>(conn->from);
info->ca = parseContentAddressOpt(readString(conn->from));
}
return info;
}
void RemoteStore::queryPathInfoUncached(const StorePath & path, void RemoteStore::queryPathInfoUncached(const StorePath & path,
Callback<std::shared_ptr<const ValidPathInfo>> callback) noexcept Callback<std::shared_ptr<const ValidPathInfo>> callback) noexcept
{ {
@ -423,7 +406,8 @@ void RemoteStore::queryPathInfoUncached(const StorePath & path,
bool valid; conn->from >> valid; bool valid; conn->from >> valid;
if (!valid) throw InvalidPath("path '%s' is not valid", printStorePath(path)); if (!valid) throw InvalidPath("path '%s' is not valid", printStorePath(path));
} }
info = readValidPathInfo(conn, path); info = std::make_shared<ValidPathInfo>(
ValidPathInfo::read(conn->from, *this, GET_PROTOCOL_MINOR(conn->daemonVersion), StorePath{path}));
} }
callback(std::move(info)); callback(std::move(info));
} catch (...) { callback.rethrow(); } } catch (...) { callback.rethrow(); }
@ -525,8 +509,8 @@ ref<const ValidPathInfo> RemoteStore::addCAToStore(
}); });
} }
auto path = parseStorePath(readString(conn->from)); return make_ref<ValidPathInfo>(
return readValidPathInfo(conn, path); ValidPathInfo::read(conn->from, *this, GET_PROTOCOL_MINOR(conn->daemonVersion)));
} }
else { else {
if (repair) throw Error("repairing is not supported when building through the Nix daemon protocol < 1.25"); if (repair) throw Error("repairing is not supported when building through the Nix daemon protocol < 1.25");
@ -642,6 +626,25 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
} }
void RemoteStore::addMultipleToStore(
Source & source,
RepairFlag repair,
CheckSigsFlag checkSigs)
{
if (GET_PROTOCOL_MINOR(getConnection()->daemonVersion) >= 32) {
auto conn(getConnection());
conn->to
<< wopAddMultipleToStore
<< repair
<< !checkSigs;
conn.withFramedSink([&](Sink & sink) {
source.drainInto(sink);
});
} else
Store::addMultipleToStore(source, repair, checkSigs);
}
StorePath RemoteStore::addTextToStore(const string & name, const string & s, StorePath RemoteStore::addTextToStore(const string & name, const string & s,
const StorePathSet & references, RepairFlag repair) const StorePathSet & references, RepairFlag repair)
{ {
@ -885,16 +888,6 @@ void RemoteStore::queryMissing(const std::vector<DerivedPath> & targets,
} }
StorePaths RemoteStore::importPaths(Source & source, CheckSigsFlag checkSigs)
{
auto conn(getConnection());
conn->to << wopImportPaths2;
source.drainInto(conn->to);
conn.processStderr();
return worker_proto::read(*this, conn->from, Phantom<StorePaths> {});
}
void RemoteStore::connect() void RemoteStore::connect()
{ {
auto conn(getConnection()); auto conn(getConnection());
@ -1021,14 +1014,14 @@ std::exception_ptr RemoteStore::Connection::processStderr(Sink * sink, Source *
return nullptr; return nullptr;
} }
void ConnectionHandle::withFramedSink(std::function<void(Sink &sink)> fun) void ConnectionHandle::withFramedSink(std::function<void(Sink & sink)> fun)
{ {
(*this)->to.flush(); (*this)->to.flush();
std::exception_ptr ex; std::exception_ptr ex;
/* Handle log messages / exceptions from the remote on a /* Handle log messages / exceptions from the remote on a separate
separate thread. */ thread. */
std::thread stderrThread([&]() std::thread stderrThread([&]()
{ {
try { try {
@ -1061,7 +1054,6 @@ void ConnectionHandle::withFramedSink(std::function<void(Sink &sink)> fun)
stderrThread.join(); stderrThread.join();
if (ex) if (ex)
std::rethrow_exception(ex); std::rethrow_exception(ex);
} }
} }

View file

@ -78,6 +78,11 @@ public:
void addToStore(const ValidPathInfo & info, Source & nar, void addToStore(const ValidPathInfo & info, Source & nar,
RepairFlag repair, CheckSigsFlag checkSigs) override; RepairFlag repair, CheckSigsFlag checkSigs) override;
void addMultipleToStore(
Source & source,
RepairFlag repair,
CheckSigsFlag checkSigs) override;
StorePath addTextToStore(const string & name, const string & s, StorePath addTextToStore(const string & name, const string & s,
const StorePathSet & references, RepairFlag repair) override; const StorePathSet & references, RepairFlag repair) override;
@ -112,8 +117,6 @@ public:
StorePathSet & willBuild, StorePathSet & willSubstitute, StorePathSet & unknown, StorePathSet & willBuild, StorePathSet & willSubstitute, StorePathSet & unknown,
uint64_t & downloadSize, uint64_t & narSize) override; uint64_t & downloadSize, uint64_t & narSize) override;
StorePaths importPaths(Source & source, CheckSigsFlag checkSigs) override;
void connect() override; void connect() override;
unsigned int getProtocol() override; unsigned int getProtocol() override;
@ -153,8 +156,6 @@ protected:
virtual void narFromPath(const StorePath & path, Sink & sink) override; virtual void narFromPath(const StorePath & path, Sink & sink) override;
ref<const ValidPathInfo> readValidPathInfo(ConnectionHandle & conn, const StorePath & path);
private: private:
std::atomic_bool failed{false}; std::atomic_bool failed{false};

View file

@ -250,6 +250,20 @@ StorePath Store::addToStore(const string & name, const Path & _srcPath,
} }
void Store::addMultipleToStore(
Source & source,
RepairFlag repair,
CheckSigsFlag checkSigs)
{
auto expected = readNum<uint64_t>(source);
for (uint64_t i = 0; i < expected; ++i) {
auto info = ValidPathInfo::read(source, *this, 16);
info.ultimate = false;
addToStore(info, source, repair, checkSigs);
}
}
/* /*
The aim of this function is to compute in one pass the correct ValidPathInfo for The aim of this function is to compute in one pass the correct ValidPathInfo for
the files that we are trying to add to the store. To accomplish that in one the files that we are trying to add to the store. To accomplish that in one
@ -771,6 +785,19 @@ const Store::Stats & Store::getStats()
} }
static std::string makeCopyPathMessage(
std::string_view srcUri,
std::string_view dstUri,
std::string_view storePath)
{
return srcUri == "local" || srcUri == "daemon"
? fmt("copying path '%s' to '%s'", storePath, dstUri)
: dstUri == "local" || dstUri == "daemon"
? fmt("copying path '%s' from '%s'", storePath, srcUri)
: fmt("copying path '%s' from '%s' to '%s'", storePath, srcUri, dstUri);
}
void copyStorePath( void copyStorePath(
Store & srcStore, Store & srcStore,
Store & dstStore, Store & dstStore,
@ -780,14 +807,10 @@ void copyStorePath(
{ {
auto srcUri = srcStore.getUri(); auto srcUri = srcStore.getUri();
auto dstUri = dstStore.getUri(); auto dstUri = dstStore.getUri();
auto storePathS = srcStore.printStorePath(storePath);
Activity act(*logger, lvlInfo, actCopyPath, Activity act(*logger, lvlInfo, actCopyPath,
srcUri == "local" || srcUri == "daemon" makeCopyPathMessage(srcUri, dstUri, storePathS),
? fmt("copying path '%s' to '%s'", srcStore.printStorePath(storePath), dstUri) {storePathS, srcUri, dstUri});
: dstUri == "local" || dstUri == "daemon"
? fmt("copying path '%s' from '%s'", srcStore.printStorePath(storePath), srcUri)
: fmt("copying path '%s' from '%s' to '%s'", srcStore.printStorePath(storePath), srcUri, dstUri),
{srcStore.printStorePath(storePath), srcUri, dstUri});
PushActivity pact(act.id); PushActivity pact(act.id);
auto info = srcStore.queryPathInfo(storePath); auto info = srcStore.queryPathInfo(storePath);
@ -896,19 +919,31 @@ std::map<StorePath, StorePath> copyPaths(
for (auto & path : storePaths) for (auto & path : storePaths)
pathsMap.insert_or_assign(path, path); pathsMap.insert_or_assign(path, path);
// FIXME: Temporary hack to copy closures in a single round-trip.
if (dynamic_cast<RemoteStore *>(&dstStore)) {
if (!missing.empty()) {
auto source = sinkToSource([&](Sink & sink) {
srcStore.exportPaths(missing, sink);
});
dstStore.importPaths(*source, NoCheckSigs);
}
return pathsMap;
}
Activity act(*logger, lvlInfo, actCopyPaths, fmt("copying %d paths", missing.size())); Activity act(*logger, lvlInfo, actCopyPaths, fmt("copying %d paths", missing.size()));
auto sorted = srcStore.topoSortPaths(missing);
std::reverse(sorted.begin(), sorted.end());
auto source = sinkToSource([&](Sink & sink) {
sink << sorted.size();
for (auto & storePath : sorted) {
auto srcUri = srcStore.getUri();
auto dstUri = dstStore.getUri();
auto storePathS = srcStore.printStorePath(storePath);
Activity act(*logger, lvlInfo, actCopyPath,
makeCopyPathMessage(srcUri, dstUri, storePathS),
{storePathS, srcUri, dstUri});
PushActivity pact(act.id);
auto info = srcStore.queryPathInfo(storePath);
info->write(sink, srcStore, 16);
srcStore.narFromPath(storePath, sink);
}
});
dstStore.addMultipleToStore(*source, repair, checkSigs);
#if 0
std::atomic<size_t> nrDone{0}; std::atomic<size_t> nrDone{0};
std::atomic<size_t> nrFailed{0}; std::atomic<size_t> nrFailed{0};
std::atomic<uint64_t> bytesExpected{0}; std::atomic<uint64_t> bytesExpected{0};
@ -986,6 +1021,8 @@ std::map<StorePath, StorePath> copyPaths(
nrDone++; nrDone++;
showProgress(); showProgress();
}); });
#endif
return pathsMap; return pathsMap;
} }

View file

@ -440,6 +440,12 @@ public:
virtual void addToStore(const ValidPathInfo & info, Source & narSource, virtual void addToStore(const ValidPathInfo & info, Source & narSource,
RepairFlag repair = NoRepair, CheckSigsFlag checkSigs = CheckSigs) = 0; RepairFlag repair = NoRepair, CheckSigsFlag checkSigs = CheckSigs) = 0;
/* Import multiple paths into the store. */
virtual void addMultipleToStore(
Source & source,
RepairFlag repair = NoRepair,
CheckSigsFlag checkSigs = CheckSigs);
/* Copy the contents of a path to the store and register the /* Copy the contents of a path to the store and register the
validity the resulting path. The resulting path is returned. validity the resulting path. The resulting path is returned.
The function object `filter' can be used to exclude files (see The function object `filter' can be used to exclude files (see

View file

@ -9,7 +9,7 @@ namespace nix {
#define WORKER_MAGIC_1 0x6e697863 #define WORKER_MAGIC_1 0x6e697863
#define WORKER_MAGIC_2 0x6478696f #define WORKER_MAGIC_2 0x6478696f
#define PROTOCOL_VERSION (1 << 8 | 31) #define PROTOCOL_VERSION (1 << 8 | 32)
#define GET_PROTOCOL_MAJOR(x) ((x) & 0xff00) #define GET_PROTOCOL_MAJOR(x) ((x) & 0xff00)
#define GET_PROTOCOL_MINOR(x) ((x) & 0x00ff) #define GET_PROTOCOL_MINOR(x) ((x) & 0x00ff)
@ -55,7 +55,7 @@ typedef enum {
wopQueryDerivationOutputMap = 41, wopQueryDerivationOutputMap = 41,
wopRegisterDrvOutput = 42, wopRegisterDrvOutput = 42,
wopQueryRealisation = 43, wopQueryRealisation = 43,
wopImportPaths2 = 44, // hack wopAddMultipleToStore = 44,
} WorkerOp; } WorkerOp;