forked from lix-project/lix
Make 'nix copy' to s3:// binary caches run in constant memory
This commit is contained in:
parent
493961b689
commit
7c2fef0a81
6 changed files with 57 additions and 27 deletions
|
@ -15,6 +15,7 @@
|
|||
#include <chrono>
|
||||
#include <future>
|
||||
#include <regex>
|
||||
#include <fstream>
|
||||
|
||||
#include <nlohmann/json.hpp>
|
||||
|
||||
|
@ -58,11 +59,10 @@ void BinaryCacheStore::init()
|
|||
}
|
||||
|
||||
void BinaryCacheStore::upsertFile(const std::string & path,
|
||||
const std::string & data,
|
||||
std::string && data,
|
||||
const std::string & mimeType)
|
||||
{
|
||||
StringSource source(data);
|
||||
upsertFile(path, source, mimeType);
|
||||
upsertFile(path, std::make_shared<std::stringstream>(std::move(data)), mimeType);
|
||||
}
|
||||
|
||||
void BinaryCacheStore::getFile(const std::string & path,
|
||||
|
@ -279,8 +279,9 @@ void BinaryCacheStore::addToStore(const ValidPathInfo & info, Source & narSource
|
|||
/* Atomically write the NAR file. */
|
||||
if (repair || !fileExists(narInfo->url)) {
|
||||
stats.narWrite++;
|
||||
FileSource source(fnTemp);
|
||||
upsertFile(narInfo->url, source, "application/x-nix-nar");
|
||||
upsertFile(narInfo->url,
|
||||
std::make_shared<std::fstream>(fnTemp, std::ios_base::in),
|
||||
"application/x-nix-nar");
|
||||
} else
|
||||
stats.narWriteAverted++;
|
||||
|
||||
|
|
|
@ -36,11 +36,11 @@ public:
|
|||
virtual bool fileExists(const std::string & path) = 0;
|
||||
|
||||
virtual void upsertFile(const std::string & path,
|
||||
Source & source,
|
||||
std::shared_ptr<std::basic_iostream<char>> istream,
|
||||
const std::string & mimeType) = 0;
|
||||
|
||||
void upsertFile(const std::string & path,
|
||||
const std::string & data,
|
||||
std::string && data,
|
||||
const std::string & mimeType);
|
||||
|
||||
/* Note: subclasses must implement at least one of the two
|
||||
|
|
|
@ -100,11 +100,11 @@ protected:
|
|||
}
|
||||
|
||||
void upsertFile(const std::string & path,
|
||||
Source & source,
|
||||
std::shared_ptr<std::basic_iostream<char>> istream,
|
||||
const std::string & mimeType) override
|
||||
{
|
||||
auto req = FileTransferRequest(cacheUri + "/" + path);
|
||||
req.data = std::make_shared<string>(source.drain());
|
||||
req.data = std::make_shared<string>(StreamToSourceAdapter(istream).drain());
|
||||
req.mimeType = mimeType;
|
||||
try {
|
||||
getFileTransfer()->upload(req);
|
||||
|
|
|
@ -31,12 +31,13 @@ protected:
|
|||
bool fileExists(const std::string & path) override;
|
||||
|
||||
void upsertFile(const std::string & path,
|
||||
Source & source,
|
||||
const std::string & mimeType)
|
||||
std::shared_ptr<std::basic_iostream<char>> istream,
|
||||
const std::string & mimeType) override
|
||||
{
|
||||
auto path2 = binaryCacheDir + "/" + path;
|
||||
Path tmp = path2 + ".tmp." + std::to_string(getpid());
|
||||
AutoDelete del(tmp, false);
|
||||
StreamToSourceAdapter source(istream);
|
||||
writeFile(tmp, source);
|
||||
if (rename(tmp.c_str(), path2.c_str()))
|
||||
throw SysError("renaming '%1%' to '%2%'", tmp, path2);
|
||||
|
|
|
@ -261,12 +261,11 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
|
|||
std::shared_ptr<TransferManager> transferManager;
|
||||
std::once_flag transferManagerCreated;
|
||||
|
||||
void uploadFile(const std::string & path, const std::string & data,
|
||||
void uploadFile(const std::string & path,
|
||||
std::shared_ptr<std::basic_iostream<char>> istream,
|
||||
const std::string & mimeType,
|
||||
const std::string & contentEncoding)
|
||||
{
|
||||
auto stream = std::make_shared<std::stringstream>(data);
|
||||
|
||||
auto maxThreads = std::thread::hardware_concurrency();
|
||||
|
||||
static std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>
|
||||
|
@ -306,7 +305,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
|
|||
|
||||
std::shared_ptr<TransferHandle> transferHandle =
|
||||
transferManager->UploadFile(
|
||||
stream, bucketName, path, mimeType,
|
||||
istream, bucketName, path, mimeType,
|
||||
Aws::Map<Aws::String, Aws::String>(),
|
||||
nullptr /*, contentEncoding */);
|
||||
|
||||
|
@ -332,9 +331,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
|
|||
if (contentEncoding != "")
|
||||
request.SetContentEncoding(contentEncoding);
|
||||
|
||||
auto stream = std::make_shared<std::stringstream>(data);
|
||||
|
||||
request.SetBody(stream);
|
||||
request.SetBody(istream);
|
||||
|
||||
auto result = checkAws(fmt("AWS error uploading '%s'", path),
|
||||
s3Helper.client->PutObject(request));
|
||||
|
@ -346,26 +343,34 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
|
|||
std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1)
|
||||
.count();
|
||||
|
||||
printInfo(format("uploaded 's3://%1%/%2%' (%3% bytes) in %4% ms") %
|
||||
bucketName % path % data.size() % duration);
|
||||
auto size = istream->tellg();
|
||||
|
||||
printInfo("uploaded 's3://%s/%s' (%d bytes) in %d ms",
|
||||
bucketName, path, size, duration);
|
||||
|
||||
stats.putTimeMs += duration;
|
||||
stats.putBytes += data.size();
|
||||
stats.putBytes += size;
|
||||
stats.put++;
|
||||
}
|
||||
|
||||
void upsertFile(const std::string & path, Source & source,
|
||||
void upsertFile(const std::string & path,
|
||||
std::shared_ptr<std::basic_iostream<char>> istream,
|
||||
const std::string & mimeType) override
|
||||
{
|
||||
auto data = source.drain();
|
||||
auto compress = [&](std::string compression)
|
||||
{
|
||||
auto compressed = nix::compress(compression, StreamToSourceAdapter(istream).drain());
|
||||
return std::make_shared<std::stringstream>(std::move(*compressed));
|
||||
};
|
||||
|
||||
if (narinfoCompression != "" && hasSuffix(path, ".narinfo"))
|
||||
uploadFile(path, *compress(narinfoCompression, data), mimeType, narinfoCompression);
|
||||
uploadFile(path, compress(narinfoCompression), mimeType, narinfoCompression);
|
||||
else if (lsCompression != "" && hasSuffix(path, ".ls"))
|
||||
uploadFile(path, *compress(lsCompression, data), mimeType, lsCompression);
|
||||
uploadFile(path, compress(lsCompression), mimeType, lsCompression);
|
||||
else if (logCompression != "" && hasPrefix(path, "log/"))
|
||||
uploadFile(path, *compress(logCompression, data), mimeType, logCompression);
|
||||
uploadFile(path, compress(logCompression), mimeType, logCompression);
|
||||
else
|
||||
uploadFile(path, data, mimeType, "");
|
||||
uploadFile(path, istream, mimeType, "");
|
||||
}
|
||||
|
||||
void getFile(const std::string & path, Sink & sink) override
|
||||
|
|
|
@ -349,4 +349,27 @@ Source & operator >> (Source & in, bool & b)
|
|||
}
|
||||
|
||||
|
||||
/* An adapter that converts a std::basic_istream into a source. */
|
||||
struct StreamToSourceAdapter : Source
|
||||
{
|
||||
std::shared_ptr<std::basic_istream<char>> istream;
|
||||
|
||||
StreamToSourceAdapter(std::shared_ptr<std::basic_istream<char>> istream)
|
||||
: istream(istream)
|
||||
{ }
|
||||
|
||||
size_t read(unsigned char * data, size_t len) override
|
||||
{
|
||||
if (!istream->read((char *) data, len)) {
|
||||
if (istream->eof()) {
|
||||
if (istream->gcount() == 0)
|
||||
throw EndOfFile("end of file");
|
||||
} else
|
||||
throw Error("I/O error in StreamToSourceAdapter");
|
||||
}
|
||||
return istream->gcount();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue