From fe1f34fa60ad79e339c38e58af071a44774663f7 Mon Sep 17 00:00:00 2001 From: Eelco Dolstra Date: Mon, 26 Jul 2021 13:31:09 +0200 Subject: [PATCH] Low-latency closure copy This adds a new store operation 'addMultipleToStore' that reads a number of NARs and ValidPathInfos from a Source, allowing any number of store paths to be copied in a single call. This is much faster on high-latency links when copying a lot of small files, like .drv closures. For example, on a connection with an 50 ms delay: Before: $ nix copy --to 'unix:///tmp/proxy-socket?root=/tmp/dest-chroot' \ /nix/store/90jjw94xiyg5drj70whm9yll6xjj0ca9-hello-2.10.drv \ --derivation --no-check-sigs real 0m57.868s user 0m0.103s sys 0m0.056s After: real 0m0.690s user 0m0.017s sys 0m0.011s --- src/libstore/daemon.cc | 49 ++++++++-------------- src/libstore/path-info.cc | 46 +++++++++++++++++++++ src/libstore/path-info.hh | 5 +++ src/libstore/remote-store.cc | 60 ++++++++++++--------------- src/libstore/remote-store.hh | 9 ++-- src/libstore/store-api.cc | 73 +++++++++++++++++++++++++-------- src/libstore/store-api.hh | 6 +++ src/libstore/worker-protocol.hh | 4 +- 8 files changed, 162 insertions(+), 90 deletions(-) create mode 100644 src/libstore/path-info.cc diff --git a/src/libstore/daemon.cc b/src/libstore/daemon.cc index 468152f7f..d68ff64d7 100644 --- a/src/libstore/daemon.cc +++ b/src/libstore/daemon.cc @@ -243,23 +243,6 @@ struct ClientSettings } }; -static void writeValidPathInfo( - ref store, - unsigned int clientVersion, - Sink & to, - std::shared_ptr info) -{ - to << (info->deriver ? store->printStorePath(*info->deriver) : "") - << info->narHash.to_string(Base16, false); - worker_proto::write(*store, to, info->references); - to << info->registrationTime << info->narSize; - if (GET_PROTOCOL_MINOR(clientVersion) >= 16) { - to << info->ultimate - << info->sigs - << renderContentAddress(info->ca); - } -} - static std::vector readDerivedPaths(Store & store, unsigned int clientVersion, Source & from) { std::vector reqs; @@ -422,9 +405,7 @@ static void performOp(TunnelLogger * logger, ref store, }(); logger->stopWork(); - to << store->printStorePath(pathInfo->path); - writeValidPathInfo(store, clientVersion, to, pathInfo); - + pathInfo->write(to, *store, GET_PROTOCOL_MINOR(clientVersion)); } else { HashType hashAlgo; std::string baseName; @@ -471,6 +452,21 @@ static void performOp(TunnelLogger * logger, ref store, break; } + case wopAddMultipleToStore: { + bool repair, dontCheckSigs; + from >> repair >> dontCheckSigs; + if (!trusted && dontCheckSigs) + dontCheckSigs = false; + + logger->startWork(); + FramedSource source(from); + store->addMultipleToStore(source, + RepairFlag{repair}, + dontCheckSigs ? NoCheckSigs : CheckSigs); + logger->stopWork(); + break; + } + case wopAddTextToStore: { string suffix = readString(from); string s = readString(from); @@ -505,17 +501,6 @@ static void performOp(TunnelLogger * logger, ref store, break; } - case wopImportPaths2: { - logger->startWork(); - auto paths = store->importPaths(from, - trusted ? NoCheckSigs : CheckSigs); - logger->stopWork(); - Strings paths2; - for (auto & i : paths) paths2.push_back(store->printStorePath(i)); - to << paths2; - break; - } - case wopBuildPaths: { auto drvs = readDerivedPaths(*store, clientVersion, from); BuildMode mode = bmNormal; @@ -781,7 +766,7 @@ static void performOp(TunnelLogger * logger, ref store, if (info) { if (GET_PROTOCOL_MINOR(clientVersion) >= 17) to << 1; - writeValidPathInfo(store, clientVersion, to, info); + info->write(to, *store, GET_PROTOCOL_MINOR(clientVersion), false); } else { assert(GET_PROTOCOL_MINOR(clientVersion) >= 17); to << 0; diff --git a/src/libstore/path-info.cc b/src/libstore/path-info.cc new file mode 100644 index 000000000..fda55b2b6 --- /dev/null +++ b/src/libstore/path-info.cc @@ -0,0 +1,46 @@ +#include "path-info.hh" +#include "worker-protocol.hh" + +namespace nix { + +ValidPathInfo ValidPathInfo::read(Source & source, const Store & store, unsigned int format) +{ + return read(source, store, format, store.parseStorePath(readString(source))); +} + +ValidPathInfo ValidPathInfo::read(Source & source, const Store & store, unsigned int format, StorePath && path) +{ + auto deriver = readString(source); + auto narHash = Hash::parseAny(readString(source), htSHA256); + ValidPathInfo info(path, narHash); + if (deriver != "") info.deriver = store.parseStorePath(deriver); + info.references = worker_proto::read(store, source, Phantom {}); + source >> info.registrationTime >> info.narSize; + if (format >= 16) { + source >> info.ultimate; + info.sigs = readStrings(source); + info.ca = parseContentAddressOpt(readString(source)); + } + return info; +} + +void ValidPathInfo::write( + Sink & sink, + const Store & store, + unsigned int format, + bool includePath) const +{ + if (includePath) + sink << store.printStorePath(path); + sink << (deriver ? store.printStorePath(*deriver) : "") + << narHash.to_string(Base16, false); + worker_proto::write(store, sink, references); + sink << registrationTime << narSize; + if (format >= 16) { + sink << ultimate + << sigs + << renderContentAddress(ca); + } +} + +} diff --git a/src/libstore/path-info.hh b/src/libstore/path-info.hh index de87f8b33..b4b54e593 100644 --- a/src/libstore/path-info.hh +++ b/src/libstore/path-info.hh @@ -105,6 +105,11 @@ struct ValidPathInfo ValidPathInfo(const StorePath & path, Hash narHash) : path(path), narHash(narHash) { }; virtual ~ValidPathInfo() { } + + static ValidPathInfo read(Source & source, const Store & store, unsigned int format); + static ValidPathInfo read(Source & source, const Store & store, unsigned int format, StorePath && path); + + void write(Sink & sink, const Store & store, unsigned int format, bool includePath = true) const; }; typedef std::map ValidPathInfos; diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc index 1471520f3..140f39120 100644 --- a/src/libstore/remote-store.cc +++ b/src/libstore/remote-store.cc @@ -386,23 +386,6 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathCAMap & pathsMap, S } -ref RemoteStore::readValidPathInfo(ConnectionHandle & conn, const StorePath & path) -{ - auto deriver = readString(conn->from); - auto narHash = Hash::parseAny(readString(conn->from), htSHA256); - auto info = make_ref(path, narHash); - if (deriver != "") info->deriver = parseStorePath(deriver); - info->references = worker_proto::read(*this, conn->from, Phantom {}); - conn->from >> info->registrationTime >> info->narSize; - if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 16) { - conn->from >> info->ultimate; - info->sigs = readStrings(conn->from); - info->ca = parseContentAddressOpt(readString(conn->from)); - } - return info; -} - - void RemoteStore::queryPathInfoUncached(const StorePath & path, Callback> callback) noexcept { @@ -423,7 +406,8 @@ void RemoteStore::queryPathInfoUncached(const StorePath & path, bool valid; conn->from >> valid; if (!valid) throw InvalidPath("path '%s' is not valid", printStorePath(path)); } - info = readValidPathInfo(conn, path); + info = std::make_shared( + ValidPathInfo::read(conn->from, *this, GET_PROTOCOL_MINOR(conn->daemonVersion), StorePath{path})); } callback(std::move(info)); } catch (...) { callback.rethrow(); } @@ -525,8 +509,8 @@ ref RemoteStore::addCAToStore( }); } - auto path = parseStorePath(readString(conn->from)); - return readValidPathInfo(conn, path); + return make_ref( + ValidPathInfo::read(conn->from, *this, GET_PROTOCOL_MINOR(conn->daemonVersion))); } else { if (repair) throw Error("repairing is not supported when building through the Nix daemon protocol < 1.25"); @@ -642,6 +626,25 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source, } +void RemoteStore::addMultipleToStore( + Source & source, + RepairFlag repair, + CheckSigsFlag checkSigs) +{ + if (GET_PROTOCOL_MINOR(getConnection()->daemonVersion) >= 32) { + auto conn(getConnection()); + conn->to + << wopAddMultipleToStore + << repair + << !checkSigs; + conn.withFramedSink([&](Sink & sink) { + source.drainInto(sink); + }); + } else + Store::addMultipleToStore(source, repair, checkSigs); +} + + StorePath RemoteStore::addTextToStore(const string & name, const string & s, const StorePathSet & references, RepairFlag repair) { @@ -885,16 +888,6 @@ void RemoteStore::queryMissing(const std::vector & targets, } -StorePaths RemoteStore::importPaths(Source & source, CheckSigsFlag checkSigs) -{ - auto conn(getConnection()); - conn->to << wopImportPaths2; - source.drainInto(conn->to); - conn.processStderr(); - return worker_proto::read(*this, conn->from, Phantom {}); -} - - void RemoteStore::connect() { auto conn(getConnection()); @@ -1021,14 +1014,14 @@ std::exception_ptr RemoteStore::Connection::processStderr(Sink * sink, Source * return nullptr; } -void ConnectionHandle::withFramedSink(std::function fun) +void ConnectionHandle::withFramedSink(std::function fun) { (*this)->to.flush(); std::exception_ptr ex; - /* Handle log messages / exceptions from the remote on a - separate thread. */ + /* Handle log messages / exceptions from the remote on a separate + thread. */ std::thread stderrThread([&]() { try { @@ -1061,7 +1054,6 @@ void ConnectionHandle::withFramedSink(std::function fun) stderrThread.join(); if (ex) std::rethrow_exception(ex); - } } diff --git a/src/libstore/remote-store.hh b/src/libstore/remote-store.hh index fbec40b4b..8901c79fc 100644 --- a/src/libstore/remote-store.hh +++ b/src/libstore/remote-store.hh @@ -78,6 +78,11 @@ public: void addToStore(const ValidPathInfo & info, Source & nar, RepairFlag repair, CheckSigsFlag checkSigs) override; + void addMultipleToStore( + Source & source, + RepairFlag repair, + CheckSigsFlag checkSigs) override; + StorePath addTextToStore(const string & name, const string & s, const StorePathSet & references, RepairFlag repair) override; @@ -112,8 +117,6 @@ public: StorePathSet & willBuild, StorePathSet & willSubstitute, StorePathSet & unknown, uint64_t & downloadSize, uint64_t & narSize) override; - StorePaths importPaths(Source & source, CheckSigsFlag checkSigs) override; - void connect() override; unsigned int getProtocol() override; @@ -153,8 +156,6 @@ protected: virtual void narFromPath(const StorePath & path, Sink & sink) override; - ref readValidPathInfo(ConnectionHandle & conn, const StorePath & path); - private: std::atomic_bool failed{false}; diff --git a/src/libstore/store-api.cc b/src/libstore/store-api.cc index 230108f6e..970bafd88 100644 --- a/src/libstore/store-api.cc +++ b/src/libstore/store-api.cc @@ -250,6 +250,20 @@ StorePath Store::addToStore(const string & name, const Path & _srcPath, } +void Store::addMultipleToStore( + Source & source, + RepairFlag repair, + CheckSigsFlag checkSigs) +{ + auto expected = readNum(source); + for (uint64_t i = 0; i < expected; ++i) { + auto info = ValidPathInfo::read(source, *this, 16); + info.ultimate = false; + addToStore(info, source, repair, checkSigs); + } +} + + /* The aim of this function is to compute in one pass the correct ValidPathInfo for the files that we are trying to add to the store. To accomplish that in one @@ -771,6 +785,19 @@ const Store::Stats & Store::getStats() } +static std::string makeCopyPathMessage( + std::string_view srcUri, + std::string_view dstUri, + std::string_view storePath) +{ + return srcUri == "local" || srcUri == "daemon" + ? fmt("copying path '%s' to '%s'", storePath, dstUri) + : dstUri == "local" || dstUri == "daemon" + ? fmt("copying path '%s' from '%s'", storePath, srcUri) + : fmt("copying path '%s' from '%s' to '%s'", storePath, srcUri, dstUri); +} + + void copyStorePath( Store & srcStore, Store & dstStore, @@ -780,14 +807,10 @@ void copyStorePath( { auto srcUri = srcStore.getUri(); auto dstUri = dstStore.getUri(); - + auto storePathS = srcStore.printStorePath(storePath); Activity act(*logger, lvlInfo, actCopyPath, - srcUri == "local" || srcUri == "daemon" - ? fmt("copying path '%s' to '%s'", srcStore.printStorePath(storePath), dstUri) - : dstUri == "local" || dstUri == "daemon" - ? fmt("copying path '%s' from '%s'", srcStore.printStorePath(storePath), srcUri) - : fmt("copying path '%s' from '%s' to '%s'", srcStore.printStorePath(storePath), srcUri, dstUri), - {srcStore.printStorePath(storePath), srcUri, dstUri}); + makeCopyPathMessage(srcUri, dstUri, storePathS), + {storePathS, srcUri, dstUri}); PushActivity pact(act.id); auto info = srcStore.queryPathInfo(storePath); @@ -896,19 +919,31 @@ std::map copyPaths( for (auto & path : storePaths) pathsMap.insert_or_assign(path, path); - // FIXME: Temporary hack to copy closures in a single round-trip. - if (dynamic_cast(&dstStore)) { - if (!missing.empty()) { - auto source = sinkToSource([&](Sink & sink) { - srcStore.exportPaths(missing, sink); - }); - dstStore.importPaths(*source, NoCheckSigs); - } - return pathsMap; - } - Activity act(*logger, lvlInfo, actCopyPaths, fmt("copying %d paths", missing.size())); + auto sorted = srcStore.topoSortPaths(missing); + std::reverse(sorted.begin(), sorted.end()); + + auto source = sinkToSource([&](Sink & sink) { + sink << sorted.size(); + for (auto & storePath : sorted) { + auto srcUri = srcStore.getUri(); + auto dstUri = dstStore.getUri(); + auto storePathS = srcStore.printStorePath(storePath); + Activity act(*logger, lvlInfo, actCopyPath, + makeCopyPathMessage(srcUri, dstUri, storePathS), + {storePathS, srcUri, dstUri}); + PushActivity pact(act.id); + + auto info = srcStore.queryPathInfo(storePath); + info->write(sink, srcStore, 16); + srcStore.narFromPath(storePath, sink); + } + }); + + dstStore.addMultipleToStore(*source, repair, checkSigs); + + #if 0 std::atomic nrDone{0}; std::atomic nrFailed{0}; std::atomic bytesExpected{0}; @@ -986,6 +1021,8 @@ std::map copyPaths( nrDone++; showProgress(); }); + #endif + return pathsMap; } diff --git a/src/libstore/store-api.hh b/src/libstore/store-api.hh index 3ad74f35c..4ed2fd151 100644 --- a/src/libstore/store-api.hh +++ b/src/libstore/store-api.hh @@ -440,6 +440,12 @@ public: virtual void addToStore(const ValidPathInfo & info, Source & narSource, RepairFlag repair = NoRepair, CheckSigsFlag checkSigs = CheckSigs) = 0; + /* Import multiple paths into the store. */ + virtual void addMultipleToStore( + Source & source, + RepairFlag repair = NoRepair, + CheckSigsFlag checkSigs = CheckSigs); + /* Copy the contents of a path to the store and register the validity the resulting path. The resulting path is returned. The function object `filter' can be used to exclude files (see diff --git a/src/libstore/worker-protocol.hh b/src/libstore/worker-protocol.hh index b1888bf01..93cf546d2 100644 --- a/src/libstore/worker-protocol.hh +++ b/src/libstore/worker-protocol.hh @@ -9,7 +9,7 @@ namespace nix { #define WORKER_MAGIC_1 0x6e697863 #define WORKER_MAGIC_2 0x6478696f -#define PROTOCOL_VERSION (1 << 8 | 31) +#define PROTOCOL_VERSION (1 << 8 | 32) #define GET_PROTOCOL_MAJOR(x) ((x) & 0xff00) #define GET_PROTOCOL_MINOR(x) ((x) & 0x00ff) @@ -55,7 +55,7 @@ typedef enum { wopQueryDerivationOutputMap = 41, wopRegisterDrvOutput = 42, wopQueryRealisation = 43, - wopImportPaths2 = 44, // hack + wopAddMultipleToStore = 44, } WorkerOp;