Make 'nix copy' to file:// binary caches run in constant memory

This commit is contained in:
Eelco Dolstra 2020-07-10 20:58:02 +02:00
parent 400f1a9b59
commit fc84c358d9
9 changed files with 120 additions and 87 deletions

View file

@ -57,6 +57,14 @@ void BinaryCacheStore::init()
} }
} }
void BinaryCacheStore::upsertFile(const std::string & path,
const std::string & data,
const std::string & mimeType)
{
StringSource source(data);
upsertFile(path, source, mimeType);
}
void BinaryCacheStore::getFile(const std::string & path, void BinaryCacheStore::getFile(const std::string & path,
Callback<std::shared_ptr<std::string>> callback) noexcept Callback<std::shared_ptr<std::string>> callback) noexcept
{ {
@ -113,13 +121,70 @@ void BinaryCacheStore::writeNarInfo(ref<NarInfo> narInfo)
diskCache->upsertNarInfo(getUri(), hashPart, std::shared_ptr<NarInfo>(narInfo)); diskCache->upsertNarInfo(getUri(), hashPart, std::shared_ptr<NarInfo>(narInfo));
} }
AutoCloseFD openFile(const Path & path)
{
auto fd = open(path.c_str(), O_RDONLY | O_CLOEXEC);
if (!fd)
throw SysError("opening file '%1%'", path);
return fd;
}
struct FileSource : FdSource
{
AutoCloseFD fd2;
FileSource(const Path & path)
: fd2(openFile(path))
{
fd = fd2.get();
}
};
void BinaryCacheStore::addToStore(const ValidPathInfo & info, Source & narSource, void BinaryCacheStore::addToStore(const ValidPathInfo & info, Source & narSource,
RepairFlag repair, CheckSigsFlag checkSigs, std::shared_ptr<FSAccessor> accessor) RepairFlag repair, CheckSigsFlag checkSigs, std::shared_ptr<FSAccessor> accessor)
{ {
// FIXME: See if we can use the original source to reduce memory usage. assert(info.narHash && info.narSize);
auto nar = make_ref<std::string>(narSource.drain());
if (!repair && isValidPath(info.path)) return; if (!repair && isValidPath(info.path)) {
// FIXME: copyNAR -> null sink
narSource.drain();
return;
}
auto [fdTemp, fnTemp] = createTempFile();
auto now1 = std::chrono::steady_clock::now();
HashSink fileHashSink(htSHA256);
{
FdSink fileSink(fdTemp.get());
TeeSink teeSink(fileSink, fileHashSink);
auto compressionSink = makeCompressionSink(compression, teeSink);
copyNAR(narSource, *compressionSink);
compressionSink->finish();
}
auto now2 = std::chrono::steady_clock::now();
auto narInfo = make_ref<NarInfo>(info);
narInfo->narSize = info.narSize;
narInfo->narHash = info.narHash;
narInfo->compression = compression;
auto [fileHash, fileSize] = fileHashSink.finish();
narInfo->fileHash = fileHash;
narInfo->fileSize = fileSize;
narInfo->url = "nar/" + narInfo->fileHash.to_string(Base32, false) + ".nar"
+ (compression == "xz" ? ".xz" :
compression == "bzip2" ? ".bz2" :
compression == "br" ? ".br" :
"");
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
printMsg(lvlTalkative, "copying path '%1%' (%2% bytes, compressed %3$.1f%% in %4% ms) to binary cache",
printStorePath(narInfo->path), info.narSize,
((1.0 - (double) fileSize / info.narSize) * 100.0),
duration);
/* Verify that all references are valid. This may do some .narinfo /* Verify that all references are valid. This may do some .narinfo
reads, but typically they'll already be cached. */ reads, but typically they'll already be cached. */
@ -132,16 +197,7 @@ void BinaryCacheStore::addToStore(const ValidPathInfo & info, Source & narSource
printStorePath(info.path), printStorePath(ref)); printStorePath(info.path), printStorePath(ref));
} }
assert(nar->compare(0, narMagic.size(), narMagic) == 0); #if 0
auto narInfo = make_ref<NarInfo>(info);
narInfo->narSize = nar->size();
narInfo->narHash = hashString(htSHA256, *nar);
if (info.narHash && info.narHash != narInfo->narHash)
throw Error("refusing to copy corrupted path '%1%' to binary cache", printStorePath(info.path));
auto accessor_ = std::dynamic_pointer_cast<RemoteFSAccessor>(accessor); auto accessor_ = std::dynamic_pointer_cast<RemoteFSAccessor>(accessor);
auto narAccessor = makeNarAccessor(nar); auto narAccessor = makeNarAccessor(nar);
@ -166,27 +222,9 @@ void BinaryCacheStore::addToStore(const ValidPathInfo & info, Source & narSource
upsertFile(std::string(info.path.to_string()) + ".ls", jsonOut.str(), "application/json"); upsertFile(std::string(info.path.to_string()) + ".ls", jsonOut.str(), "application/json");
} }
#endif
/* Compress the NAR. */ #if 0
narInfo->compression = compression;
auto now1 = std::chrono::steady_clock::now();
auto narCompressed = compress(compression, *nar, parallelCompression);
auto now2 = std::chrono::steady_clock::now();
narInfo->fileHash = hashString(htSHA256, *narCompressed);
narInfo->fileSize = narCompressed->size();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
printMsg(lvlTalkative, "copying path '%1%' (%2% bytes, compressed %3$.1f%% in %4% ms) to binary cache",
printStorePath(narInfo->path), narInfo->narSize,
((1.0 - (double) narCompressed->size() / nar->size()) * 100.0),
duration);
narInfo->url = "nar/" + narInfo->fileHash.to_string(Base32, false) + ".nar"
+ (compression == "xz" ? ".xz" :
compression == "bzip2" ? ".bz2" :
compression == "br" ? ".br" :
"");
/* Optionally maintain an index of DWARF debug info files /* Optionally maintain an index of DWARF debug info files
consisting of JSON files named 'debuginfo/<build-id>' that consisting of JSON files named 'debuginfo/<build-id>' that
specify the NAR file and member containing the debug info. */ specify the NAR file and member containing the debug info. */
@ -243,16 +281,18 @@ void BinaryCacheStore::addToStore(const ValidPathInfo & info, Source & narSource
threadPool.process(); threadPool.process();
} }
} }
#endif
/* Atomically write the NAR file. */ /* Atomically write the NAR file. */
if (repair || !fileExists(narInfo->url)) { if (repair || !fileExists(narInfo->url)) {
stats.narWrite++; stats.narWrite++;
upsertFile(narInfo->url, *narCompressed, "application/x-nix-nar"); FileSource source(fnTemp);
upsertFile(narInfo->url, source, "application/x-nix-nar");
} else } else
stats.narWriteAverted++; stats.narWriteAverted++;
stats.narWriteBytes += nar->size(); stats.narWriteBytes += info.narSize;
stats.narWriteCompressedBytes += narCompressed->size(); stats.narWriteCompressedBytes += fileSize;
stats.narWriteCompressionTimeMs += duration; stats.narWriteCompressionTimeMs += duration;
/* Atomically write the NAR info file.*/ /* Atomically write the NAR info file.*/

View file

@ -36,9 +36,13 @@ public:
virtual bool fileExists(const std::string & path) = 0; virtual bool fileExists(const std::string & path) = 0;
virtual void upsertFile(const std::string & path, virtual void upsertFile(const std::string & path,
const std::string & data, Source & source,
const std::string & mimeType) = 0; const std::string & mimeType) = 0;
void upsertFile(const std::string & path,
const std::string & data,
const std::string & mimeType);
/* Note: subclasses must implement at least one of the two /* Note: subclasses must implement at least one of the two
following getFile() methods. */ following getFile() methods. */

View file

@ -731,7 +731,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
if (GET_PROTOCOL_MINOR(clientVersion) >= 21) if (GET_PROTOCOL_MINOR(clientVersion) >= 21)
source = std::make_unique<TunnelSource>(from, to); source = std::make_unique<TunnelSource>(from, to);
else { else {
TeeSink tee(from); TeeParseSink tee(from);
parseDump(tee, tee.source); parseDump(tee, tee.source);
saved = std::move(*tee.source.data); saved = std::move(*tee.source.data);
source = std::make_unique<StringSource>(saved); source = std::make_unique<StringSource>(saved);

View file

@ -7,24 +7,6 @@
namespace nix { namespace nix {
struct HashAndWriteSink : Sink
{
Sink & writeSink;
HashSink hashSink;
HashAndWriteSink(Sink & writeSink) : writeSink(writeSink), hashSink(htSHA256)
{
}
virtual void operator () (const unsigned char * data, size_t len)
{
writeSink(data, len);
hashSink(data, len);
}
Hash currentHash()
{
return hashSink.currentHash().first;
}
};
void Store::exportPaths(const StorePathSet & paths, Sink & sink) void Store::exportPaths(const StorePathSet & paths, Sink & sink)
{ {
auto sorted = topoSortPaths(paths); auto sorted = topoSortPaths(paths);
@ -47,23 +29,24 @@ void Store::exportPath(const StorePath & path, Sink & sink)
{ {
auto info = queryPathInfo(path); auto info = queryPathInfo(path);
HashAndWriteSink hashAndWriteSink(sink); HashSink hashSink(htSHA256);
TeeSink teeSink(sink, hashSink);
narFromPath(path, hashAndWriteSink); narFromPath(path, teeSink);
/* Refuse to export paths that have changed. This prevents /* Refuse to export paths that have changed. This prevents
filesystem corruption from spreading to other machines. filesystem corruption from spreading to other machines.
Don't complain if the stored hash is zero (unknown). */ Don't complain if the stored hash is zero (unknown). */
Hash hash = hashAndWriteSink.currentHash(); Hash hash = hashSink.currentHash().first;
if (hash != info->narHash && info->narHash != Hash(*info->narHash.type)) if (hash != info->narHash && info->narHash != Hash(*info->narHash.type))
throw Error("hash of path '%s' has changed from '%s' to '%s'!", throw Error("hash of path '%s' has changed from '%s' to '%s'!",
printStorePath(path), info->narHash.to_string(Base32, true), hash.to_string(Base32, true)); printStorePath(path), info->narHash.to_string(Base32, true), hash.to_string(Base32, true));
hashAndWriteSink teeSink
<< exportMagic << exportMagic
<< printStorePath(path); << printStorePath(path);
writeStorePaths(*this, hashAndWriteSink, info->references); writeStorePaths(*this, teeSink, info->references);
hashAndWriteSink teeSink
<< (info->deriver ? printStorePath(*info->deriver) : "") << (info->deriver ? printStorePath(*info->deriver) : "")
<< 0; << 0;
} }
@ -77,7 +60,7 @@ StorePaths Store::importPaths(Source & source, std::shared_ptr<FSAccessor> acces
if (n != 1) throw Error("input doesn't look like something created by 'nix-store --export'"); if (n != 1) throw Error("input doesn't look like something created by 'nix-store --export'");
/* Extract the NAR from the source. */ /* Extract the NAR from the source. */
TeeSink tee(source); TeeParseSink tee(source);
parseDump(tee, tee.source); parseDump(tee, tee.source);
uint32_t magic = readInt(source); uint32_t magic = readInt(source);

View file

@ -100,11 +100,11 @@ protected:
} }
void upsertFile(const std::string & path, void upsertFile(const std::string & path,
const std::string & data, Source & source,
const std::string & mimeType) override const std::string & mimeType) override
{ {
auto req = FileTransferRequest(cacheUri + "/" + path); auto req = FileTransferRequest(cacheUri + "/" + path);
req.data = std::make_shared<string>(data); // FIXME: inefficient req.data = std::make_shared<string>(source.drain());
req.mimeType = mimeType; req.mimeType = mimeType;
try { try {
getFileTransfer()->upload(req); getFileTransfer()->upload(req);

View file

@ -31,8 +31,17 @@ protected:
bool fileExists(const std::string & path) override; bool fileExists(const std::string & path) override;
void upsertFile(const std::string & path, void upsertFile(const std::string & path,
const std::string & data, Source & source,
const std::string & mimeType) override; const std::string & mimeType)
{
auto path2 = binaryCacheDir + "/" + path;
Path tmp = path2 + ".tmp." + std::to_string(getpid());
AutoDelete del(tmp, false);
writeFile(tmp, source);
if (rename(tmp.c_str(), path2.c_str()))
throw SysError("renaming '%1%' to '%2%'", tmp, path2);
del.cancel();
}
void getFile(const std::string & path, Sink & sink) override void getFile(const std::string & path, Sink & sink) override
{ {
@ -70,28 +79,11 @@ void LocalBinaryCacheStore::init()
BinaryCacheStore::init(); BinaryCacheStore::init();
} }
static void atomicWrite(const Path & path, const std::string & s)
{
Path tmp = path + ".tmp." + std::to_string(getpid());
AutoDelete del(tmp, false);
writeFile(tmp, s);
if (rename(tmp.c_str(), path.c_str()))
throw SysError("renaming '%1%' to '%2%'", tmp, path);
del.cancel();
}
bool LocalBinaryCacheStore::fileExists(const std::string & path) bool LocalBinaryCacheStore::fileExists(const std::string & path)
{ {
return pathExists(binaryCacheDir + "/" + path); return pathExists(binaryCacheDir + "/" + path);
} }
void LocalBinaryCacheStore::upsertFile(const std::string & path,
const std::string & data,
const std::string & mimeType)
{
atomicWrite(binaryCacheDir + "/" + path, data);
}
static RegisterStoreImplementation regStore([]( static RegisterStoreImplementation regStore([](
const std::string & uri, const Store::Params & params) const std::string & uri, const Store::Params & params)
-> std::shared_ptr<Store> -> std::shared_ptr<Store>

View file

@ -355,9 +355,10 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
stats.put++; stats.put++;
} }
void upsertFile(const std::string & path, const std::string & data, void upsertFile(const std::string & path, Source & source,
const std::string & mimeType) override const std::string & mimeType) override
{ {
auto data = source.drain();
if (narinfoCompression != "" && hasSuffix(path, ".narinfo")) if (narinfoCompression != "" && hasSuffix(path, ".narinfo"))
uploadFile(path, *compress(narinfoCompression, data), mimeType, narinfoCompression); uploadFile(path, *compress(narinfoCompression, data), mimeType, narinfoCompression);
else if (lsCompression != "" && hasSuffix(path, ".ls")) else if (lsCompression != "" && hasSuffix(path, ".ls"))

View file

@ -63,11 +63,11 @@ struct ParseSink
virtual void createSymlink(const Path & path, const string & target) { }; virtual void createSymlink(const Path & path, const string & target) { };
}; };
struct TeeSink : ParseSink struct TeeParseSink : ParseSink
{ {
TeeSource source; TeeSource source;
TeeSink(Source & source) : source(source) { } TeeParseSink(Source & source) : source(source) { }
}; };
void parseDump(ParseSink & sink, Source & source); void parseDump(ParseSink & sink, Source & source);

View file

@ -166,6 +166,19 @@ struct StringSource : Source
}; };
/* A sink that writes all incoming data to two other sinks. */
struct TeeSink : Sink
{
Sink & sink1, & sink2;
TeeSink(Sink & sink1, Sink & sink2) : sink1(sink1), sink2(sink2) { }
virtual void operator () (const unsigned char * data, size_t len)
{
sink1(data, len);
sink2(data, len);
}
};
/* Adapter class of a Source that saves all data read to `s'. */ /* Adapter class of a Source that saves all data read to `s'. */
struct TeeSource : Source struct TeeSource : Source
{ {