Merge pull request #3801 from obsidiansystems/from-dump-stream

Constant space `addToStoreFromDump` and deduplicate code
This commit is contained in:
Eelco Dolstra 2020-07-21 15:11:52 +02:00 committed by GitHub
commit 0835447eaa
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 100 additions and 119 deletions

View file

@ -2774,7 +2774,7 @@ struct RestrictedStore : public LocalFSStore
goal.addDependency(info.path);
}
StorePath addToStoreFromDump(const string & dump, const string & name,
StorePath addToStoreFromDump(Source & dump, const string & name,
FileIngestionMethod method = FileIngestionMethod::Recursive, HashType hashAlgo = htSHA256, RepairFlag repair = NoRepair) override
{
auto path = next->addToStoreFromDump(dump, name, method, hashAlgo, repair);

View file

@ -350,21 +350,24 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
}
case wopAddToStore: {
std::string s, baseName;
HashType hashAlgo;
std::string baseName;
FileIngestionMethod method;
{
bool fixed; uint8_t recursive;
from >> baseName >> fixed /* obsolete */ >> recursive >> s;
bool fixed;
uint8_t recursive;
std::string hashAlgoRaw;
from >> baseName >> fixed /* obsolete */ >> recursive >> hashAlgoRaw;
if (recursive > (uint8_t) FileIngestionMethod::Recursive)
throw Error("unsupported FileIngestionMethod with value of %i; you may need to upgrade nix-daemon", recursive);
method = FileIngestionMethod { recursive };
/* Compatibility hack. */
if (!fixed) {
s = "sha256";
hashAlgoRaw = "sha256";
method = FileIngestionMethod::Recursive;
}
hashAlgo = parseHashType(hashAlgoRaw);
}
HashType hashAlgo = parseHashType(s);
StringSink saved;
TeeSource savedNARSource(from, saved);
@ -382,7 +385,9 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
logger->startWork();
if (!savedRegular.regular) throw Error("regular file expected");
auto path = store->addToStoreFromDump(*saved.s, baseName, method, hashAlgo);
// FIXME: try to stream directly from `from`.
StringSource dumpSource { *saved.s };
auto path = store->addToStoreFromDump(dumpSource, baseName, method, hashAlgo);
logger->stopWork();
to << store->printStorePath(path);

View file

@ -1033,82 +1033,26 @@ void LocalStore::addToStore(const ValidPathInfo & info, Source & source,
}
StorePath LocalStore::addToStoreFromDump(const string & dump, const string & name,
FileIngestionMethod method, HashType hashAlgo, RepairFlag repair)
{
Hash h = hashString(hashAlgo, dump);
auto dstPath = makeFixedOutputPath(method, h, name);
addTempRoot(dstPath);
if (repair || !isValidPath(dstPath)) {
/* The first check above is an optimisation to prevent
unnecessary lock acquisition. */
auto realPath = Store::toRealPath(dstPath);
PathLocks outputLock({realPath});
if (repair || !isValidPath(dstPath)) {
deletePath(realPath);
autoGC();
if (method == FileIngestionMethod::Recursive) {
StringSource source(dump);
restorePath(realPath, source);
} else
writeFile(realPath, dump);
canonicalisePathMetaData(realPath, -1);
/* Register the SHA-256 hash of the NAR serialisation of
the path in the database. We may just have computed it
above (if called with recursive == true and hashAlgo ==
sha256); otherwise, compute it here. */
HashResult hash;
if (method == FileIngestionMethod::Recursive) {
hash.first = hashAlgo == htSHA256 ? h : hashString(htSHA256, dump);
hash.second = dump.size();
} else
hash = hashPath(htSHA256, realPath);
optimisePath(realPath); // FIXME: combine with hashPath()
ValidPathInfo info(dstPath);
info.narHash = hash.first;
info.narSize = hash.second;
info.ca = FixedOutputHash { .method = method, .hash = h };
registerValidPath(info);
}
outputLock.setDeletion(true);
}
return dstPath;
}
StorePath LocalStore::addToStore(const string & name, const Path & _srcPath,
FileIngestionMethod method, HashType hashAlgo, PathFilter & filter, RepairFlag repair)
{
Path srcPath(absPath(_srcPath));
auto source = sinkToSource([&](Sink & sink) {
if (method == FileIngestionMethod::Recursive)
dumpPath(srcPath, sink, filter);
else
readFile(srcPath, sink);
});
return addToStoreFromDump(*source, name, method, hashAlgo, repair);
}
if (method != FileIngestionMethod::Recursive)
return addToStoreFromDump(readFile(srcPath), name, method, hashAlgo, repair);
/* For computing the NAR hash. */
auto sha256Sink = std::make_unique<HashSink>(htSHA256);
/* For computing the store path. In recursive SHA-256 mode, this
is the same as the NAR hash, so no need to do it again. */
std::unique_ptr<HashSink> hashSink =
hashAlgo == htSHA256
? nullptr
: std::make_unique<HashSink>(hashAlgo);
StorePath LocalStore::addToStoreFromDump(Source & source0, const string & name,
FileIngestionMethod method, HashType hashAlgo, RepairFlag repair)
{
/* For computing the store path. */
auto hashSink = std::make_unique<HashSink>(hashAlgo);
TeeSource source { source0, *hashSink };
/* Read the source path into memory, but only if it's up to
narBufferSize bytes. If it's larger, write it to a temporary
@ -1116,55 +1060,49 @@ StorePath LocalStore::addToStore(const string & name, const Path & _srcPath,
destination store path is already valid, we just delete the
temporary path. Otherwise, we move it to the destination store
path. */
bool inMemory = true;
std::string nar;
bool inMemory = false;
auto source = sinkToSource([&](Sink & sink) {
std::string dump;
LambdaSink sink2([&](const unsigned char * buf, size_t len) {
(*sha256Sink)(buf, len);
if (hashSink) (*hashSink)(buf, len);
if (inMemory) {
if (nar.size() + len > settings.narBufferSize) {
inMemory = false;
sink << 1;
sink((const unsigned char *) nar.data(), nar.size());
nar.clear();
} else {
nar.append((const char *) buf, len);
}
}
if (!inMemory) sink(buf, len);
});
dumpPath(srcPath, sink2, filter);
});
/* Fill out buffer, and decide whether we are working strictly in
memory based on whether we break out because the buffer is full
or the original source is empty */
while (dump.size() < settings.narBufferSize) {
auto oldSize = dump.size();
constexpr size_t chunkSize = 1024;
auto want = std::min(chunkSize, settings.narBufferSize - oldSize);
dump.resize(oldSize + want);
auto got = 0;
try {
got = source.read((uint8_t *) dump.data() + oldSize, want);
} catch (EndOfFile &) {
inMemory = true;
break;
}
dump.resize(oldSize + got);
}
std::unique_ptr<AutoDelete> delTempDir;
Path tempPath;
try {
/* Wait for the source coroutine to give us some dummy
data. This is so that we don't create the temporary
directory if the NAR fits in memory. */
readInt(*source);
if (!inMemory) {
/* Drain what we pulled so far, and then keep on pulling */
StringSource dumpSource { dump };
ChainSource bothSource { dumpSource, source };
auto tempDir = createTempDir(realStoreDir, "add");
delTempDir = std::make_unique<AutoDelete>(tempDir);
tempPath = tempDir + "/x";
restorePath(tempPath, *source);
if (method == FileIngestionMethod::Recursive)
restorePath(tempPath, bothSource);
else
writeFile(tempPath, bothSource);
} catch (EndOfFile &) {
if (!inMemory) throw;
/* The NAR fits in memory, so we didn't do restorePath(). */
dump.clear();
}
auto sha256 = sha256Sink->finish();
Hash hash = hashSink ? hashSink->finish().first : sha256.first;
auto [hash, size] = hashSink->finish();
auto dstPath = makeFixedOutputPath(method, hash, name);
@ -1186,22 +1124,34 @@ StorePath LocalStore::addToStore(const string & name, const Path & _srcPath,
autoGC();
if (inMemory) {
StringSource dumpSource { dump };
/* Restore from the NAR in memory. */
StringSource source(nar);
restorePath(realPath, source);
if (method == FileIngestionMethod::Recursive)
restorePath(realPath, dumpSource);
else
writeFile(realPath, dumpSource);
} else {
/* Move the temporary path we restored above. */
if (rename(tempPath.c_str(), realPath.c_str()))
throw Error("renaming '%s' to '%s'", tempPath, realPath);
}
/* For computing the nar hash. In recursive SHA-256 mode, this
is the same as the store hash, so no need to do it again. */
auto narHash = std::pair { hash, size };
if (method != FileIngestionMethod::Recursive || hashAlgo != htSHA256) {
HashSink narSink { htSHA256 };
dumpPath(realPath, narSink);
narHash = narSink.finish();
}
canonicalisePathMetaData(realPath, -1); // FIXME: merge into restorePath
optimisePath(realPath);
ValidPathInfo info(dstPath);
info.narHash = sha256.first;
info.narSize = sha256.second;
info.narHash = narHash.first;
info.narSize = narHash.second;
info.ca = FixedOutputHash { .method = method, .hash = hash };
registerValidPath(info);
}

View file

@ -153,7 +153,7 @@ public:
in `dump', which is either a NAR serialisation (if recursive ==
true) or simply the contents of a regular file (if recursive ==
false). */
StorePath addToStoreFromDump(const string & dump, const string & name,
StorePath addToStoreFromDump(Source & dump, const string & name,
FileIngestionMethod method = FileIngestionMethod::Recursive, HashType hashAlgo = htSHA256, RepairFlag repair = NoRepair) override;
StorePath addTextToStore(const string & name, const string & s,

View file

@ -460,7 +460,7 @@ public:
std::optional<Hash> expectedCAHash = {});
// FIXME: remove?
virtual StorePath addToStoreFromDump(const string & dump, const string & name,
virtual StorePath addToStoreFromDump(Source & dump, const string & name,
FileIngestionMethod method = FileIngestionMethod::Recursive, HashType hashAlgo = htSHA256, RepairFlag repair = NoRepair)
{
throw Error("addToStoreFromDump() is not supported by this store");

View file

@ -322,5 +322,18 @@ void StringSink::operator () (const unsigned char * data, size_t len)
s->append((const char *) data, len);
}
size_t ChainSource::read(unsigned char * data, size_t len)
{
if (useSecond) {
return source2.read(data, len);
} else {
try {
return source1.read(data, len);
} catch (EndOfFile &) {
useSecond = true;
return this->read(data, len);
}
}
}
}

View file

@ -189,7 +189,7 @@ struct TeeSource : Source
size_t read(unsigned char * data, size_t len)
{
size_t n = orig.read(data, len);
sink(data, len);
sink(data, n);
return n;
}
};
@ -256,6 +256,19 @@ struct LambdaSource : Source
}
};
/* Chain two sources together so after the first is exhausted, the second is
used */
struct ChainSource : Source
{
Source & source1, & source2;
bool useSecond = false;
ChainSource(Source & s1, Source & s2)
: source1(s1), source2(s2)
{ }
size_t read(unsigned char * data, size_t len) override;
};
/* Convert a function that feeds data into a Sink into a Source. The
Source executes the function as a coroutine. */