LocalStore::addToStoreFromDump copy in chunks

Rather than copying byte-by-byte, we let the coroutine know how much
data we would like it to send back to us.
This commit is contained in:
John Ericson 2020-07-11 23:40:49 +00:00
parent 9de96ef7d4
commit 592851fb67
4 changed files with 40 additions and 22 deletions

View file

@ -1036,11 +1036,13 @@ void LocalStore::addToStore(const ValidPathInfo & info, Source & source,
StorePath LocalStore::addToStoreFromDump(Source & dump, const string & name, StorePath LocalStore::addToStoreFromDump(Source & dump, const string & name,
FileIngestionMethod method, HashType hashAlgo, RepairFlag repair) FileIngestionMethod method, HashType hashAlgo, RepairFlag repair)
{ {
return addToStoreCommon(name, method, hashAlgo, repair, [&](auto & sink) { return addToStoreCommon(name, method, hashAlgo, repair, [&](auto & sink, size_t & wanted) {
while (1) { while (1) {
uint8_t buf[1]; constexpr size_t bufSize = 1024;
auto n = dump.read(buf, 1); uint8_t buf[bufSize];
auto n = dump.read(buf, std::min(wanted, bufSize));
sink(buf, n); sink(buf, n);
// when control is yielded back to us wanted will be updated.
} }
}); });
} }
@ -1051,7 +1053,7 @@ StorePath LocalStore::addToStore(const string & name, const Path & _srcPath,
{ {
Path srcPath(absPath(_srcPath)); Path srcPath(absPath(_srcPath));
return addToStoreCommon(name, method, hashAlgo, repair, [&](auto & sink) { return addToStoreCommon(name, method, hashAlgo, repair, [&](auto & sink, size_t & _) {
if (method == FileIngestionMethod::Recursive) if (method == FileIngestionMethod::Recursive)
dumpPath(srcPath, sink, filter); dumpPath(srcPath, sink, filter);
else else
@ -1062,7 +1064,7 @@ StorePath LocalStore::addToStore(const string & name, const Path & _srcPath,
StorePath LocalStore::addToStoreCommon( StorePath LocalStore::addToStoreCommon(
const string & name, FileIngestionMethod method, HashType hashAlgo, RepairFlag repair, const string & name, FileIngestionMethod method, HashType hashAlgo, RepairFlag repair,
std::function<void(Sink &)> demux) std::function<void(Sink &, size_t &)> demux)
{ {
/* For computing the NAR hash. */ /* For computing the NAR hash. */
auto sha256Sink = std::make_unique<HashSink>(htSHA256); auto sha256Sink = std::make_unique<HashSink>(htSHA256);
@ -1083,7 +1085,7 @@ StorePath LocalStore::addToStoreCommon(
bool inMemory = true; bool inMemory = true;
std::string nar; std::string nar;
auto source = sinkToSource([&](Sink & sink) { auto source = sinkToSource([&](Sink & sink, size_t & wanted) {
LambdaSink sink2([&](const unsigned char * buf, size_t len) { LambdaSink sink2([&](const unsigned char * buf, size_t len) {
(*sha256Sink)(buf, len); (*sha256Sink)(buf, len);
if (hashSink) (*hashSink)(buf, len); if (hashSink) (*hashSink)(buf, len);
@ -1101,7 +1103,7 @@ StorePath LocalStore::addToStoreCommon(
if (!inMemory) sink(buf, len); if (!inMemory) sink(buf, len);
}); });
demux(sink2); demux(sink2, wanted);
}); });
std::unique_ptr<AutoDelete> delTempDir; std::unique_ptr<AutoDelete> delTempDir;

View file

@ -292,7 +292,7 @@ private:
StorePath addToStoreCommon( StorePath addToStoreCommon(
const string & name, FileIngestionMethod method, HashType hashAlgo, RepairFlag repair, const string & name, FileIngestionMethod method, HashType hashAlgo, RepairFlag repair,
std::function<void(Sink &)> demux); std::function<void(Sink &, size_t &)> demux);
Path getRealStoreDir() override { return realStoreDir; } Path getRealStoreDir() override { return realStoreDir; }

View file

@ -165,35 +165,43 @@ size_t StringSource::read(unsigned char * data, size_t len)
#endif #endif
std::unique_ptr<Source> sinkToSource( std::unique_ptr<Source> sinkToSource(
std::function<void(Sink &)> fun, std::function<void(Sink &, size_t &)> fun,
std::function<void()> eof) std::function<void()> eof)
{ {
struct SinkToSource : Source struct SinkToSource : Source
{ {
typedef boost::coroutines2::coroutine<std::string> coro_t; typedef boost::coroutines2::coroutine<std::basic_string<uint8_t>> coro_t;
std::function<void(Sink &)> fun; std::function<void(Sink &, size_t &)> fun;
std::function<void()> eof; std::function<void()> eof;
std::optional<coro_t::pull_type> coro; std::optional<coro_t::pull_type> coro;
bool started = false; bool started = false;
SinkToSource(std::function<void(Sink &)> fun, std::function<void()> eof) /* It would be nicer to have the co-routines have both args and a
return value, but unfortunately that was removed from Boost's
implementation for some reason, so we use some extra state instead.
*/
size_t wanted = 0;
SinkToSource(std::function<void(Sink &, size_t &)> fun, std::function<void()> eof)
: fun(fun), eof(eof) : fun(fun), eof(eof)
{ {
} }
std::string cur; std::basic_string<uint8_t> cur;
size_t pos = 0; size_t pos = 0;
size_t read(unsigned char * data, size_t len) override size_t read(unsigned char * data, size_t len) override
{ {
if (!coro) wanted = len < cur.size() ? 0 : len - cur.size();
if (!coro) {
coro = coro_t::pull_type([&](coro_t::push_type & yield) { coro = coro_t::pull_type([&](coro_t::push_type & yield) {
LambdaSink sink([&](const unsigned char * data, size_t len) { LambdaSink sink([&](const uint8_t * data, size_t len) {
if (len) yield(std::string((const char *) data, len)); if (len) yield(std::basic_string<uint8_t> { data, len });
}); });
fun(sink); fun(sink, wanted);
}); });
}
if (!*coro) { eof(); abort(); } if (!*coro) { eof(); abort(); }
@ -203,11 +211,10 @@ std::unique_ptr<Source> sinkToSource(
pos = 0; pos = 0;
} }
auto n = std::min(cur.size() - pos, len); auto numCopied = cur.copy(data, len, pos);
memcpy(data, (unsigned char *) cur.data() + pos, n); pos += numCopied;
pos += n;
return n; return numCopied;
} }
}; };

View file

@ -260,11 +260,20 @@ struct LambdaSource : Source
/* Convert a function that feeds data into a Sink into a Source. The /* Convert a function that feeds data into a Sink into a Source. The
Source executes the function as a coroutine. */ Source executes the function as a coroutine. */
std::unique_ptr<Source> sinkToSource( std::unique_ptr<Source> sinkToSource(
std::function<void(Sink &)> fun, std::function<void(Sink &, size_t &)> fun,
std::function<void()> eof = []() { std::function<void()> eof = []() {
throw EndOfFile("coroutine has finished"); throw EndOfFile("coroutine has finished");
}); });
static inline std::unique_ptr<Source> sinkToSource(
std::function<void(Sink &)> fun,
std::function<void()> eof = []() {
throw EndOfFile("coroutine has finished");
})
{
return sinkToSource([fun](Sink & s, size_t & _) { fun(s); }, eof);
}
void writePadding(size_t len, Sink & sink); void writePadding(size_t len, Sink & sink);
void writeString(const unsigned char * buf, size_t len, Sink & sink); void writeString(const unsigned char * buf, size_t len, Sink & sink);