From d094dd0396a9ec0b4ce725412cc73c6d9af31021 Mon Sep 17 00:00:00 2001 From: eldritch horrors Date: Sat, 18 May 2024 19:24:17 +0200 Subject: [PATCH] libstore: remove remaining sinkToSource uses Change-Id: Id1ee0d2ad4a3774f4bbb960d76f0f76ac4f3eff9 --- src/libstore/remote-store.cc | 29 +++++++++----- src/libstore/store-api.cc | 78 +++++++++++++++++++----------------- 2 files changed, 61 insertions(+), 46 deletions(-) diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc index 56b6093bc..1b0524316 100644 --- a/src/libstore/remote-store.cc +++ b/src/libstore/remote-store.cc @@ -485,17 +485,26 @@ void RemoteStore::addMultipleToStore( { auto remoteVersion = getProtocol(); - auto source = sinkToSource([&](Sink & sink) { - sink << pathsToCopy.size(); - for (auto & [pathInfo, pathSource] : pathsToCopy) { - sink << WorkerProto::Serialise::write(*this, - WorkerProto::WriteConn {remoteVersion}, - pathInfo); - pathSource->drainInto(sink); - } - }); + GeneratorSource source{ + [](auto self, auto & pathsToCopy, auto remoteVersion) -> WireFormatGenerator { + co_yield pathsToCopy.size(); + for (auto & [pathInfo, pathSource] : pathsToCopy) { + co_yield WorkerProto::Serialise::write(*self, + WorkerProto::WriteConn {remoteVersion}, + pathInfo); + try { + char buf[65536]; + while (true) { + const auto read = pathSource->read(buf, sizeof(buf)); + co_yield std::span{buf, read}; + } + } catch (EndOfFile &) { + } + } + }(this, pathsToCopy, remoteVersion) + }; - addMultipleToStore(*source, repair, checkSigs); + addMultipleToStore(source, repair, checkSigs); } void RemoteStore::addMultipleToStore( diff --git a/src/libstore/store-api.cc b/src/libstore/store-api.cc index 28a414555..cb027d311 100644 --- a/src/libstore/store-api.cc +++ b/src/libstore/store-api.cc @@ -335,9 +335,9 @@ void Store::addMultipleToStore( info.ultimate = false; /* Make sure that the Source object is destroyed when - we're done. In particular, a SinkToSource object must - be destroyed to ensure that the destructors on its - stack frame are run; this includes + we're done. In particular, a coroutine object must + be destroyed to ensure that the destructors in its + state are run; this includes LegacySSHStore::narFromPath()'s connection lock. */ auto source = std::move(source_); @@ -1059,16 +1059,19 @@ void copyStorePath( info = info2; } - auto source = sinkToSource([&](Sink & sink) { - LambdaSink progressSink([&, total = 0ULL](std::string_view data) mutable { - total += data.size(); - act.progress(total, info->narSize); - }); - TeeSink tee { sink, progressSink }; - tee << srcStore.narFromPath(storePath); - }); + GeneratorSource source{ + [](auto & act, auto & info, auto & srcStore, auto & storePath) -> WireFormatGenerator { + auto nar = srcStore.narFromPath(storePath); + uint64_t total = 0; + while (auto data = nar.next()) { + total += data->size(); + act.progress(total, info->narSize); + co_yield *data; + } + }(act, info, srcStore, storePath) + }; - dstStore.addToStore(*info, *source, repair, checkSigs); + dstStore.addToStore(*info, source, repair, checkSigs); } @@ -1180,31 +1183,34 @@ std::map copyPaths( ValidPathInfo infoForDst = *info; infoForDst.path = storePathForDst; - auto source = - sinkToSource([&srcStore, &dstStore, missingPath = missingPath, info = std::move(info)](Sink & sink) { - // We can reasonably assume that the copy will happen whenever we - // read the path, so log something about that at that point - auto srcUri = srcStore.getUri(); - auto dstUri = dstStore.getUri(); - auto storePathS = srcStore.printStorePath(missingPath); - Activity act( - *logger, - lvlInfo, - actCopyPath, - makeCopyPathMessage(srcUri, dstUri, storePathS), - {storePathS, srcUri, dstUri} - ); - PushActivity pact(act.id); + auto source = [](auto & srcStore, auto & dstStore, auto missingPath, auto info + ) -> WireFormatGenerator { + // We can reasonably assume that the copy will happen whenever we + // read the path, so log something about that at that point + auto srcUri = srcStore.getUri(); + auto dstUri = dstStore.getUri(); + auto storePathS = srcStore.printStorePath(missingPath); + Activity act( + *logger, + lvlInfo, + actCopyPath, + makeCopyPathMessage(srcUri, dstUri, storePathS), + {storePathS, srcUri, dstUri} + ); + PushActivity pact(act.id); - LambdaSink progressSink([&, total = 0ULL](std::string_view data) mutable { - total += data.size(); - act.progress(total, info->narSize); - }); - TeeSink tee{sink, progressSink}; - - tee << srcStore.narFromPath(missingPath); - }); - pathsToCopy.push_back(std::pair{infoForDst, std::move(source)}); + auto nar = srcStore.narFromPath(missingPath); + uint64_t total = 0; + while (auto data = nar.next()) { + total += data->size(); + act.progress(total, info->narSize); + co_yield *data; + } + }; + pathsToCopy.push_back(std::pair{ + infoForDst, + std::make_unique(source(srcStore, dstStore, missingPath, info)) + }); } dstStore.addMultipleToStore(pathsToCopy, act, repair, checkSigs);