libstore: remove remaining sinkToSource uses

Change-Id: Id1ee0d2ad4a3774f4bbb960d76f0f76ac4f3eff9
This commit is contained in:
eldritch horrors 2024-05-18 19:24:17 +02:00
parent 6b4d46e9e0
commit d094dd0396
2 changed files with 61 additions and 46 deletions

View file

@ -485,17 +485,26 @@ void RemoteStore::addMultipleToStore(
{ {
auto remoteVersion = getProtocol(); auto remoteVersion = getProtocol();
auto source = sinkToSource([&](Sink & sink) { GeneratorSource source{
sink << pathsToCopy.size(); [](auto self, auto & pathsToCopy, auto remoteVersion) -> WireFormatGenerator {
for (auto & [pathInfo, pathSource] : pathsToCopy) { co_yield pathsToCopy.size();
sink << WorkerProto::Serialise<ValidPathInfo>::write(*this, for (auto & [pathInfo, pathSource] : pathsToCopy) {
WorkerProto::WriteConn {remoteVersion}, co_yield WorkerProto::Serialise<ValidPathInfo>::write(*self,
pathInfo); WorkerProto::WriteConn {remoteVersion},
pathSource->drainInto(sink); pathInfo);
} try {
}); char buf[65536];
while (true) {
const auto read = pathSource->read(buf, sizeof(buf));
co_yield std::span{buf, read};
}
} catch (EndOfFile &) {
}
}
}(this, pathsToCopy, remoteVersion)
};
addMultipleToStore(*source, repair, checkSigs); addMultipleToStore(source, repair, checkSigs);
} }
void RemoteStore::addMultipleToStore( void RemoteStore::addMultipleToStore(

View file

@ -335,9 +335,9 @@ void Store::addMultipleToStore(
info.ultimate = false; info.ultimate = false;
/* Make sure that the Source object is destroyed when /* Make sure that the Source object is destroyed when
we're done. In particular, a SinkToSource object must we're done. In particular, a coroutine object must
be destroyed to ensure that the destructors on its be destroyed to ensure that the destructors in its
stack frame are run; this includes state are run; this includes
LegacySSHStore::narFromPath()'s connection lock. */ LegacySSHStore::narFromPath()'s connection lock. */
auto source = std::move(source_); auto source = std::move(source_);
@ -1059,16 +1059,19 @@ void copyStorePath(
info = info2; info = info2;
} }
auto source = sinkToSource([&](Sink & sink) { GeneratorSource source{
LambdaSink progressSink([&, total = 0ULL](std::string_view data) mutable { [](auto & act, auto & info, auto & srcStore, auto & storePath) -> WireFormatGenerator {
total += data.size(); auto nar = srcStore.narFromPath(storePath);
act.progress(total, info->narSize); uint64_t total = 0;
}); while (auto data = nar.next()) {
TeeSink tee { sink, progressSink }; total += data->size();
tee << srcStore.narFromPath(storePath); act.progress(total, info->narSize);
}); co_yield *data;
}
}(act, info, srcStore, storePath)
};
dstStore.addToStore(*info, *source, repair, checkSigs); dstStore.addToStore(*info, source, repair, checkSigs);
} }
@ -1180,31 +1183,34 @@ std::map<StorePath, StorePath> copyPaths(
ValidPathInfo infoForDst = *info; ValidPathInfo infoForDst = *info;
infoForDst.path = storePathForDst; infoForDst.path = storePathForDst;
auto source = auto source = [](auto & srcStore, auto & dstStore, auto missingPath, auto info
sinkToSource([&srcStore, &dstStore, missingPath = missingPath, info = std::move(info)](Sink & sink) { ) -> WireFormatGenerator {
// We can reasonably assume that the copy will happen whenever we // We can reasonably assume that the copy will happen whenever we
// read the path, so log something about that at that point // read the path, so log something about that at that point
auto srcUri = srcStore.getUri(); auto srcUri = srcStore.getUri();
auto dstUri = dstStore.getUri(); auto dstUri = dstStore.getUri();
auto storePathS = srcStore.printStorePath(missingPath); auto storePathS = srcStore.printStorePath(missingPath);
Activity act( Activity act(
*logger, *logger,
lvlInfo, lvlInfo,
actCopyPath, actCopyPath,
makeCopyPathMessage(srcUri, dstUri, storePathS), makeCopyPathMessage(srcUri, dstUri, storePathS),
{storePathS, srcUri, dstUri} {storePathS, srcUri, dstUri}
); );
PushActivity pact(act.id); PushActivity pact(act.id);
LambdaSink progressSink([&, total = 0ULL](std::string_view data) mutable { auto nar = srcStore.narFromPath(missingPath);
total += data.size(); uint64_t total = 0;
act.progress(total, info->narSize); while (auto data = nar.next()) {
}); total += data->size();
TeeSink tee{sink, progressSink}; act.progress(total, info->narSize);
co_yield *data;
tee << srcStore.narFromPath(missingPath); }
}); };
pathsToCopy.push_back(std::pair{infoForDst, std::move(source)}); pathsToCopy.push_back(std::pair{
infoForDst,
std::make_unique<GeneratorSource>(source(srcStore, dstStore, missingPath, info))
});
} }
dstStore.addMultipleToStore(pathsToCopy, act, repair, checkSigs); dstStore.addMultipleToStore(pathsToCopy, act, repair, checkSigs);