From fc84c358d9e55e9ba1d939d8974f6deef629848e Mon Sep 17 00:00:00 2001 From: Eelco Dolstra Date: Fri, 10 Jul 2020 20:58:02 +0200 Subject: [PATCH] Make 'nix copy' to file:// binary caches run in constant memory --- src/libstore/binary-cache-store.cc | 112 +++++++++++++++-------- src/libstore/binary-cache-store.hh | 6 +- src/libstore/daemon.cc | 2 +- src/libstore/export-import.cc | 33 ++----- src/libstore/http-binary-cache-store.cc | 4 +- src/libstore/local-binary-cache-store.cc | 30 +++--- src/libstore/s3-binary-cache-store.cc | 3 +- src/libutil/archive.hh | 4 +- src/libutil/serialise.hh | 13 +++ 9 files changed, 120 insertions(+), 87 deletions(-) diff --git a/src/libstore/binary-cache-store.cc b/src/libstore/binary-cache-store.cc index 9f52ddafa..166041b6c 100644 --- a/src/libstore/binary-cache-store.cc +++ b/src/libstore/binary-cache-store.cc @@ -57,6 +57,14 @@ void BinaryCacheStore::init() } } +void BinaryCacheStore::upsertFile(const std::string & path, + const std::string & data, + const std::string & mimeType) +{ + StringSource source(data); + upsertFile(path, source, mimeType); +} + void BinaryCacheStore::getFile(const std::string & path, Callback> callback) noexcept { @@ -113,13 +121,70 @@ void BinaryCacheStore::writeNarInfo(ref narInfo) diskCache->upsertNarInfo(getUri(), hashPart, std::shared_ptr(narInfo)); } +AutoCloseFD openFile(const Path & path) +{ + auto fd = open(path.c_str(), O_RDONLY | O_CLOEXEC); + if (!fd) + throw SysError("opening file '%1%'", path); + return fd; +} + +struct FileSource : FdSource +{ + AutoCloseFD fd2; + + FileSource(const Path & path) + : fd2(openFile(path)) + { + fd = fd2.get(); + } +}; + void BinaryCacheStore::addToStore(const ValidPathInfo & info, Source & narSource, RepairFlag repair, CheckSigsFlag checkSigs, std::shared_ptr accessor) { - // FIXME: See if we can use the original source to reduce memory usage. - auto nar = make_ref(narSource.drain()); + assert(info.narHash && info.narSize); - if (!repair && isValidPath(info.path)) return; + if (!repair && isValidPath(info.path)) { + // FIXME: copyNAR -> null sink + narSource.drain(); + return; + } + + auto [fdTemp, fnTemp] = createTempFile(); + + auto now1 = std::chrono::steady_clock::now(); + + HashSink fileHashSink(htSHA256); + + { + FdSink fileSink(fdTemp.get()); + TeeSink teeSink(fileSink, fileHashSink); + auto compressionSink = makeCompressionSink(compression, teeSink); + copyNAR(narSource, *compressionSink); + compressionSink->finish(); + } + + auto now2 = std::chrono::steady_clock::now(); + + auto narInfo = make_ref(info); + narInfo->narSize = info.narSize; + narInfo->narHash = info.narHash; + narInfo->compression = compression; + auto [fileHash, fileSize] = fileHashSink.finish(); + narInfo->fileHash = fileHash; + narInfo->fileSize = fileSize; + narInfo->url = "nar/" + narInfo->fileHash.to_string(Base32, false) + ".nar" + + (compression == "xz" ? ".xz" : + compression == "bzip2" ? ".bz2" : + compression == "br" ? ".br" : + ""); + + auto duration = std::chrono::duration_cast(now2 - now1).count(); + printMsg(lvlTalkative, "copying path '%1%' (%2% bytes, compressed %3$.1f%% in %4% ms) to binary cache", + printStorePath(narInfo->path), info.narSize, + ((1.0 - (double) fileSize / info.narSize) * 100.0), + duration); /* Verify that all references are valid. This may do some .narinfo reads, but typically they'll already be cached. */ @@ -132,16 +197,7 @@ void BinaryCacheStore::addToStore(const ValidPathInfo & info, Source & narSource printStorePath(info.path), printStorePath(ref)); } - assert(nar->compare(0, narMagic.size(), narMagic) == 0); - - auto narInfo = make_ref(info); - - narInfo->narSize = nar->size(); - narInfo->narHash = hashString(htSHA256, *nar); - - if (info.narHash && info.narHash != narInfo->narHash) - throw Error("refusing to copy corrupted path '%1%' to binary cache", printStorePath(info.path)); - + #if 0 auto accessor_ = std::dynamic_pointer_cast(accessor); auto narAccessor = makeNarAccessor(nar); @@ -166,27 +222,9 @@ void BinaryCacheStore::addToStore(const ValidPathInfo & info, Source & narSource upsertFile(std::string(info.path.to_string()) + ".ls", jsonOut.str(), "application/json"); } + #endif - /* Compress the NAR. */ - narInfo->compression = compression; - auto now1 = std::chrono::steady_clock::now(); - auto narCompressed = compress(compression, *nar, parallelCompression); - auto now2 = std::chrono::steady_clock::now(); - narInfo->fileHash = hashString(htSHA256, *narCompressed); - narInfo->fileSize = narCompressed->size(); - - auto duration = std::chrono::duration_cast(now2 - now1).count(); - printMsg(lvlTalkative, "copying path '%1%' (%2% bytes, compressed %3$.1f%% in %4% ms) to binary cache", - printStorePath(narInfo->path), narInfo->narSize, - ((1.0 - (double) narCompressed->size() / nar->size()) * 100.0), - duration); - - narInfo->url = "nar/" + narInfo->fileHash.to_string(Base32, false) + ".nar" - + (compression == "xz" ? ".xz" : - compression == "bzip2" ? ".bz2" : - compression == "br" ? ".br" : - ""); - + #if 0 /* Optionally maintain an index of DWARF debug info files consisting of JSON files named 'debuginfo/' that specify the NAR file and member containing the debug info. */ @@ -243,16 +281,18 @@ void BinaryCacheStore::addToStore(const ValidPathInfo & info, Source & narSource threadPool.process(); } } + #endif /* Atomically write the NAR file. */ if (repair || !fileExists(narInfo->url)) { stats.narWrite++; - upsertFile(narInfo->url, *narCompressed, "application/x-nix-nar"); + FileSource source(fnTemp); + upsertFile(narInfo->url, source, "application/x-nix-nar"); } else stats.narWriteAverted++; - stats.narWriteBytes += nar->size(); - stats.narWriteCompressedBytes += narCompressed->size(); + stats.narWriteBytes += info.narSize; + stats.narWriteCompressedBytes += fileSize; stats.narWriteCompressionTimeMs += duration; /* Atomically write the NAR info file.*/ diff --git a/src/libstore/binary-cache-store.hh b/src/libstore/binary-cache-store.hh index 52ef8aa7a..4f0379533 100644 --- a/src/libstore/binary-cache-store.hh +++ b/src/libstore/binary-cache-store.hh @@ -36,9 +36,13 @@ public: virtual bool fileExists(const std::string & path) = 0; virtual void upsertFile(const std::string & path, - const std::string & data, + Source & source, const std::string & mimeType) = 0; + void upsertFile(const std::string & path, + const std::string & data, + const std::string & mimeType); + /* Note: subclasses must implement at least one of the two following getFile() methods. */ diff --git a/src/libstore/daemon.cc b/src/libstore/daemon.cc index ebc4d0285..f05f4739d 100644 --- a/src/libstore/daemon.cc +++ b/src/libstore/daemon.cc @@ -731,7 +731,7 @@ static void performOp(TunnelLogger * logger, ref store, if (GET_PROTOCOL_MINOR(clientVersion) >= 21) source = std::make_unique(from, to); else { - TeeSink tee(from); + TeeParseSink tee(from); parseDump(tee, tee.source); saved = std::move(*tee.source.data); source = std::make_unique(saved); diff --git a/src/libstore/export-import.cc b/src/libstore/export-import.cc index 57b7e9590..cfed8ccd8 100644 --- a/src/libstore/export-import.cc +++ b/src/libstore/export-import.cc @@ -7,24 +7,6 @@ namespace nix { -struct HashAndWriteSink : Sink -{ - Sink & writeSink; - HashSink hashSink; - HashAndWriteSink(Sink & writeSink) : writeSink(writeSink), hashSink(htSHA256) - { - } - virtual void operator () (const unsigned char * data, size_t len) - { - writeSink(data, len); - hashSink(data, len); - } - Hash currentHash() - { - return hashSink.currentHash().first; - } -}; - void Store::exportPaths(const StorePathSet & paths, Sink & sink) { auto sorted = topoSortPaths(paths); @@ -47,23 +29,24 @@ void Store::exportPath(const StorePath & path, Sink & sink) { auto info = queryPathInfo(path); - HashAndWriteSink hashAndWriteSink(sink); + HashSink hashSink(htSHA256); + TeeSink teeSink(sink, hashSink); - narFromPath(path, hashAndWriteSink); + narFromPath(path, teeSink); /* Refuse to export paths that have changed. This prevents filesystem corruption from spreading to other machines. Don't complain if the stored hash is zero (unknown). */ - Hash hash = hashAndWriteSink.currentHash(); + Hash hash = hashSink.currentHash().first; if (hash != info->narHash && info->narHash != Hash(*info->narHash.type)) throw Error("hash of path '%s' has changed from '%s' to '%s'!", printStorePath(path), info->narHash.to_string(Base32, true), hash.to_string(Base32, true)); - hashAndWriteSink + teeSink << exportMagic << printStorePath(path); - writeStorePaths(*this, hashAndWriteSink, info->references); - hashAndWriteSink + writeStorePaths(*this, teeSink, info->references); + teeSink << (info->deriver ? printStorePath(*info->deriver) : "") << 0; } @@ -77,7 +60,7 @@ StorePaths Store::importPaths(Source & source, std::shared_ptr acces if (n != 1) throw Error("input doesn't look like something created by 'nix-store --export'"); /* Extract the NAR from the source. */ - TeeSink tee(source); + TeeParseSink tee(source); parseDump(tee, tee.source); uint32_t magic = readInt(source); diff --git a/src/libstore/http-binary-cache-store.cc b/src/libstore/http-binary-cache-store.cc index 451a64785..d9a292368 100644 --- a/src/libstore/http-binary-cache-store.cc +++ b/src/libstore/http-binary-cache-store.cc @@ -100,11 +100,11 @@ protected: } void upsertFile(const std::string & path, - const std::string & data, + Source & source, const std::string & mimeType) override { auto req = FileTransferRequest(cacheUri + "/" + path); - req.data = std::make_shared(data); // FIXME: inefficient + req.data = std::make_shared(source.drain()); req.mimeType = mimeType; try { getFileTransfer()->upload(req); diff --git a/src/libstore/local-binary-cache-store.cc b/src/libstore/local-binary-cache-store.cc index 215c016f5..3d531d3a7 100644 --- a/src/libstore/local-binary-cache-store.cc +++ b/src/libstore/local-binary-cache-store.cc @@ -31,8 +31,17 @@ protected: bool fileExists(const std::string & path) override; void upsertFile(const std::string & path, - const std::string & data, - const std::string & mimeType) override; + Source & source, + const std::string & mimeType) + { + auto path2 = binaryCacheDir + "/" + path; + Path tmp = path2 + ".tmp." + std::to_string(getpid()); + AutoDelete del(tmp, false); + writeFile(tmp, source); + if (rename(tmp.c_str(), path2.c_str())) + throw SysError("renaming '%1%' to '%2%'", tmp, path2); + del.cancel(); + } void getFile(const std::string & path, Sink & sink) override { @@ -70,28 +79,11 @@ void LocalBinaryCacheStore::init() BinaryCacheStore::init(); } -static void atomicWrite(const Path & path, const std::string & s) -{ - Path tmp = path + ".tmp." + std::to_string(getpid()); - AutoDelete del(tmp, false); - writeFile(tmp, s); - if (rename(tmp.c_str(), path.c_str())) - throw SysError("renaming '%1%' to '%2%'", tmp, path); - del.cancel(); -} - bool LocalBinaryCacheStore::fileExists(const std::string & path) { return pathExists(binaryCacheDir + "/" + path); } -void LocalBinaryCacheStore::upsertFile(const std::string & path, - const std::string & data, - const std::string & mimeType) -{ - atomicWrite(binaryCacheDir + "/" + path, data); -} - static RegisterStoreImplementation regStore([]( const std::string & uri, const Store::Params & params) -> std::shared_ptr diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc index f85563766..57f16101d 100644 --- a/src/libstore/s3-binary-cache-store.cc +++ b/src/libstore/s3-binary-cache-store.cc @@ -355,9 +355,10 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore stats.put++; } - void upsertFile(const std::string & path, const std::string & data, + void upsertFile(const std::string & path, Source & source, const std::string & mimeType) override { + auto data = source.drain(); if (narinfoCompression != "" && hasSuffix(path, ".narinfo")) uploadFile(path, *compress(narinfoCompression, data), mimeType, narinfoCompression); else if (lsCompression != "" && hasSuffix(path, ".ls")) diff --git a/src/libutil/archive.hh b/src/libutil/archive.hh index 768fe2536..795e9ce02 100644 --- a/src/libutil/archive.hh +++ b/src/libutil/archive.hh @@ -63,11 +63,11 @@ struct ParseSink virtual void createSymlink(const Path & path, const string & target) { }; }; -struct TeeSink : ParseSink +struct TeeParseSink : ParseSink { TeeSource source; - TeeSink(Source & source) : source(source) { } + TeeParseSink(Source & source) : source(source) { } }; void parseDump(ParseSink & sink, Source & source); diff --git a/src/libutil/serialise.hh b/src/libutil/serialise.hh index a04118512..bd651fb7d 100644 --- a/src/libutil/serialise.hh +++ b/src/libutil/serialise.hh @@ -166,6 +166,19 @@ struct StringSource : Source }; +/* A sink that writes all incoming data to two other sinks. */ +struct TeeSink : Sink +{ + Sink & sink1, & sink2; + TeeSink(Sink & sink1, Sink & sink2) : sink1(sink1), sink2(sink2) { } + virtual void operator () (const unsigned char * data, size_t len) + { + sink1(data, len); + sink2(data, len); + } +}; + + /* Adapter class of a Source that saves all data read to `s'. */ struct TeeSource : Source {