From 7c2fef0a819481058d49c469c115bb0668b7016b Mon Sep 17 00:00:00 2001 From: Eelco Dolstra Date: Mon, 13 Jul 2020 20:07:19 +0200 Subject: [PATCH] Make 'nix copy' to s3:// binary caches run in constant memory --- src/libstore/binary-cache-store.cc | 11 +++---- src/libstore/binary-cache-store.hh | 4 +-- src/libstore/http-binary-cache-store.cc | 4 +-- src/libstore/local-binary-cache-store.cc | 5 ++-- src/libstore/s3-binary-cache-store.cc | 37 ++++++++++++++---------- src/libutil/serialise.hh | 23 +++++++++++++++ 6 files changed, 57 insertions(+), 27 deletions(-) diff --git a/src/libstore/binary-cache-store.cc b/src/libstore/binary-cache-store.cc index e71240558..b791c125b 100644 --- a/src/libstore/binary-cache-store.cc +++ b/src/libstore/binary-cache-store.cc @@ -15,6 +15,7 @@ #include #include #include +#include #include @@ -58,11 +59,10 @@ void BinaryCacheStore::init() } void BinaryCacheStore::upsertFile(const std::string & path, - const std::string & data, + std::string && data, const std::string & mimeType) { - StringSource source(data); - upsertFile(path, source, mimeType); + upsertFile(path, std::make_shared(std::move(data)), mimeType); } void BinaryCacheStore::getFile(const std::string & path, @@ -279,8 +279,9 @@ void BinaryCacheStore::addToStore(const ValidPathInfo & info, Source & narSource /* Atomically write the NAR file. */ if (repair || !fileExists(narInfo->url)) { stats.narWrite++; - FileSource source(fnTemp); - upsertFile(narInfo->url, source, "application/x-nix-nar"); + upsertFile(narInfo->url, + std::make_shared(fnTemp, std::ios_base::in), + "application/x-nix-nar"); } else stats.narWriteAverted++; diff --git a/src/libstore/binary-cache-store.hh b/src/libstore/binary-cache-store.hh index 2bf8d56b4..9bcdf5901 100644 --- a/src/libstore/binary-cache-store.hh +++ b/src/libstore/binary-cache-store.hh @@ -36,11 +36,11 @@ public: virtual bool fileExists(const std::string & path) = 0; virtual void upsertFile(const std::string & path, - Source & source, + std::shared_ptr> istream, const std::string & mimeType) = 0; void upsertFile(const std::string & path, - const std::string & data, + std::string && data, const std::string & mimeType); /* Note: subclasses must implement at least one of the two diff --git a/src/libstore/http-binary-cache-store.cc b/src/libstore/http-binary-cache-store.cc index d9a292368..c1ceb08cf 100644 --- a/src/libstore/http-binary-cache-store.cc +++ b/src/libstore/http-binary-cache-store.cc @@ -100,11 +100,11 @@ protected: } void upsertFile(const std::string & path, - Source & source, + std::shared_ptr> istream, const std::string & mimeType) override { auto req = FileTransferRequest(cacheUri + "/" + path); - req.data = std::make_shared(source.drain()); + req.data = std::make_shared(StreamToSourceAdapter(istream).drain()); req.mimeType = mimeType; try { getFileTransfer()->upload(req); diff --git a/src/libstore/local-binary-cache-store.cc b/src/libstore/local-binary-cache-store.cc index 3d531d3a7..87d8334d7 100644 --- a/src/libstore/local-binary-cache-store.cc +++ b/src/libstore/local-binary-cache-store.cc @@ -31,12 +31,13 @@ protected: bool fileExists(const std::string & path) override; void upsertFile(const std::string & path, - Source & source, - const std::string & mimeType) + std::shared_ptr> istream, + const std::string & mimeType) override { auto path2 = binaryCacheDir + "/" + path; Path tmp = path2 + ".tmp." + std::to_string(getpid()); AutoDelete del(tmp, false); + StreamToSourceAdapter source(istream); writeFile(tmp, source); if (rename(tmp.c_str(), path2.c_str())) throw SysError("renaming '%1%' to '%2%'", tmp, path2); diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc index 31ad4a3be..1b7dff085 100644 --- a/src/libstore/s3-binary-cache-store.cc +++ b/src/libstore/s3-binary-cache-store.cc @@ -261,12 +261,11 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore std::shared_ptr transferManager; std::once_flag transferManagerCreated; - void uploadFile(const std::string & path, const std::string & data, + void uploadFile(const std::string & path, + std::shared_ptr> istream, const std::string & mimeType, const std::string & contentEncoding) { - auto stream = std::make_shared(data); - auto maxThreads = std::thread::hardware_concurrency(); static std::shared_ptr @@ -306,7 +305,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore std::shared_ptr transferHandle = transferManager->UploadFile( - stream, bucketName, path, mimeType, + istream, bucketName, path, mimeType, Aws::Map(), nullptr /*, contentEncoding */); @@ -332,9 +331,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore if (contentEncoding != "") request.SetContentEncoding(contentEncoding); - auto stream = std::make_shared(data); - - request.SetBody(stream); + request.SetBody(istream); auto result = checkAws(fmt("AWS error uploading '%s'", path), s3Helper.client->PutObject(request)); @@ -346,26 +343,34 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore std::chrono::duration_cast(now2 - now1) .count(); - printInfo(format("uploaded 's3://%1%/%2%' (%3% bytes) in %4% ms") % - bucketName % path % data.size() % duration); + auto size = istream->tellg(); + + printInfo("uploaded 's3://%s/%s' (%d bytes) in %d ms", + bucketName, path, size, duration); stats.putTimeMs += duration; - stats.putBytes += data.size(); + stats.putBytes += size; stats.put++; } - void upsertFile(const std::string & path, Source & source, + void upsertFile(const std::string & path, + std::shared_ptr> istream, const std::string & mimeType) override { - auto data = source.drain(); + auto compress = [&](std::string compression) + { + auto compressed = nix::compress(compression, StreamToSourceAdapter(istream).drain()); + return std::make_shared(std::move(*compressed)); + }; + if (narinfoCompression != "" && hasSuffix(path, ".narinfo")) - uploadFile(path, *compress(narinfoCompression, data), mimeType, narinfoCompression); + uploadFile(path, compress(narinfoCompression), mimeType, narinfoCompression); else if (lsCompression != "" && hasSuffix(path, ".ls")) - uploadFile(path, *compress(lsCompression, data), mimeType, lsCompression); + uploadFile(path, compress(lsCompression), mimeType, lsCompression); else if (logCompression != "" && hasPrefix(path, "log/")) - uploadFile(path, *compress(logCompression, data), mimeType, logCompression); + uploadFile(path, compress(logCompression), mimeType, logCompression); else - uploadFile(path, data, mimeType, ""); + uploadFile(path, istream, mimeType, ""); } void getFile(const std::string & path, Sink & sink) override diff --git a/src/libutil/serialise.hh b/src/libutil/serialise.hh index 84a4eb001..8386a4991 100644 --- a/src/libutil/serialise.hh +++ b/src/libutil/serialise.hh @@ -349,4 +349,27 @@ Source & operator >> (Source & in, bool & b) } +/* An adapter that converts a std::basic_istream into a source. */ +struct StreamToSourceAdapter : Source +{ + std::shared_ptr> istream; + + StreamToSourceAdapter(std::shared_ptr> istream) + : istream(istream) + { } + + size_t read(unsigned char * data, size_t len) override + { + if (!istream->read((char *) data, len)) { + if (istream->eof()) { + if (istream->gcount() == 0) + throw EndOfFile("end of file"); + } else + throw Error("I/O error in StreamToSourceAdapter"); + } + return istream->gcount(); + } +}; + + }