From c83b13eafd73a3a427348ed779e8d5f4804c8cb1 Mon Sep 17 00:00:00 2001 From: eldritch horrors Date: Mon, 28 Oct 2024 18:59:43 +0100 Subject: [PATCH] libstore: reunify all file transfer methods again with the api cleaned up we can suddenly reunify uploads, downloads, and existence checks through curl in the same wrapper function. uploads and existence checks simply don't use the result source, and given that all transfers (or at least *most* transfers to date) go through the network the few extra allocations do not hurt us at all. even for file:// calls the overhead won't much matter as going to disk and back *is* expensive Change-Id: I4f9ca6681a8fc303377b4cf4c63e3363ae32c18b --- src/libstore/filetransfer.cc | 156 +++++++++++------------------------ 1 file changed, 49 insertions(+), 107 deletions(-) diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc index 03b3a045b..48ac39016 100644 --- a/src/libstore/filetransfer.cc +++ b/src/libstore/filetransfer.cc @@ -702,122 +702,42 @@ struct curlFileTransfer : public FileTransfer void upload(const std::string & uri, std::string data, const Headers & headers) override { - enqueueFileTransfer(uri, headers, std::move(data), false).get(); + enqueueFileTransfer(uri, headers, std::move(data), false); } - std::future> enqueueFileTransfer( + std::pair> enqueueFileTransfer( const std::string & uri, const Headers & headers, std::optional data, bool noBody ) - { - struct State { - std::string data; - }; - - auto _state = std::make_shared>(); - - auto [meta, done] = enqueueFileTransfer( - uri, - headers, - [](std::exception_ptr ex) { - if (ex) { - std::rethrow_exception(ex); - } - }, - [_state](std::string_view data) { - _state->lock()->data.append(data); - }, - std::move(data), - noBody - ); - - return std::async( - std::launch::deferred, - [_state, _meta{std::move(meta)}, done{std::move(done)}]() mutable { - auto meta = _meta.get(); - done.get(); - auto state(_state->lock()); - return std::pair(std::move(meta), std::move(state->data)); - } - ); - } - - std::pair, std::future> enqueueFileTransfer( - const std::string & uri, - const Headers & headers, - std::invocable auto doneCallback, - std::function dataCallback, - std::optional data, - bool noBody - ) { /* Ugly hack to support s3:// URIs. */ if (uri.starts_with("s3://")) { // FIXME: do this on a worker thread - return { - std::async( - std::launch::deferred, - [uri, dataCallback] { #if ENABLE_S3 - auto [bucketName, key, params] = parseS3Uri(uri); + auto [bucketName, key, params] = parseS3Uri(uri); - std::string profile = getOr(params, "profile", ""); - std::string region = getOr(params, "region", Aws::Region::US_EAST_1); - std::string scheme = getOr(params, "scheme", ""); - std::string endpoint = getOr(params, "endpoint", ""); + std::string profile = getOr(params, "profile", ""); + std::string region = getOr(params, "region", Aws::Region::US_EAST_1); + std::string scheme = getOr(params, "scheme", ""); + std::string endpoint = getOr(params, "endpoint", ""); - S3Helper s3Helper(profile, region, scheme, endpoint); + S3Helper s3Helper(profile, region, scheme, endpoint); - // FIXME: implement ETag - auto s3Res = s3Helper.getObject(bucketName, key); - FileTransferResult res; - if (!s3Res.data) - throw FileTransferError(NotFound, "S3 object '%s' does not exist", uri); - dataCallback(*s3Res.data); - return res; + // FIXME: implement ETag + auto s3Res = s3Helper.getObject(bucketName, key); + FileTransferResult res; + if (!s3Res.data) + throw FileTransferError(NotFound, "S3 object '%s' does not exist", uri); + return {res, make_box_ptr(std::move(*s3Res.data))}; #else - throw nix::Error( - "cannot download '%s' because Lix is not built with S3 support", uri - ); + throw nix::Error( + "cannot download '%s' because Lix is not built with S3 support", uri + ); #endif - } - ), - std::async(std::launch::deferred, []{}), - }; } - auto item = enqueueItem(std::make_shared( - *this, - uri, - headers, - getCurActivity(), - std::move(doneCallback), - std::move(dataCallback), - std::move(data), - noBody - )); - return {item->metadataPromise.get_future(), item->doneCallback.get_future()}; - } - - bool exists(const std::string & uri, const Headers & headers) override - { - try { - enqueueFileTransfer(uri, headers, std::nullopt, true).get(); - return true; - } catch (FileTransferError & e) { - /* S3 buckets return 403 if a file doesn't exist and the - bucket is unlistable, so treat 403 as 404. */ - if (e.error == FileTransfer::NotFound || e.error == FileTransfer::Forbidden) - return false; - throw; - } - } - - std::pair> - download(const std::string & uri, const Headers & headers) override - { struct State { bool done = false, failed = false; std::exception_ptr exc; @@ -827,9 +747,11 @@ struct curlFileTransfer : public FileTransfer auto _state = std::make_shared>(); - auto [metadataFuture, _done] = enqueueFileTransfer( + auto item = enqueueItem(std::make_shared( + *this, uri, headers, + getCurActivity(), [_state](std::exception_ptr ex) { auto state(_state->lock()); state->done = true; @@ -860,19 +782,19 @@ struct curlFileTransfer : public FileTransfer state->data.append(data); state->avail.notify_one(); }, - std::nullopt, - false - ); + std::move(data), + noBody + )); - struct DownloadSource : Source + struct TransferSource : Source { const std::shared_ptr> _state; std::string chunk; std::string_view buffered; - explicit DownloadSource(const std::shared_ptr> & state) : _state(state) {} + explicit TransferSource(const std::shared_ptr> & state) : _state(state) {} - ~DownloadSource() + ~TransferSource() { // wake up the download thread if it's still going and have it abort auto state(_state->lock()); @@ -927,19 +849,39 @@ struct curlFileTransfer : public FileTransfer } if (total == 0) { - throw EndOfFile("download finished"); + throw EndOfFile("transfer finished"); } return total; } }; - auto metadata = metadataFuture.get(); - auto source = make_box_ptr(_state); + auto metadata = item->metadataPromise.get_future().get(); + auto source = make_box_ptr(_state); auto lock(_state->lock()); source->awaitData(lock); return {std::move(metadata), std::move(source)}; } + + bool exists(const std::string & uri, const Headers & headers) override + { + try { + enqueueFileTransfer(uri, headers, std::nullopt, true); + return true; + } catch (FileTransferError & e) { + /* S3 buckets return 403 if a file doesn't exist and the + bucket is unlistable, so treat 403 as 404. */ + if (e.error == FileTransfer::NotFound || e.error == FileTransfer::Forbidden) + return false; + throw; + } + } + + std::pair> + download(const std::string & uri, const Headers & headers) override + { + return enqueueFileTransfer(uri, headers, std::nullopt, false); + } }; ref makeCurlFileTransfer(std::optional baseRetryTimeMs)