diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc index 03b3a045b..48ac39016 100644 --- a/src/libstore/filetransfer.cc +++ b/src/libstore/filetransfer.cc @@ -702,122 +702,42 @@ struct curlFileTransfer : public FileTransfer void upload(const std::string & uri, std::string data, const Headers & headers) override { - enqueueFileTransfer(uri, headers, std::move(data), false).get(); + enqueueFileTransfer(uri, headers, std::move(data), false); } - std::future> enqueueFileTransfer( + std::pair> enqueueFileTransfer( const std::string & uri, const Headers & headers, std::optional data, bool noBody ) - { - struct State { - std::string data; - }; - - auto _state = std::make_shared>(); - - auto [meta, done] = enqueueFileTransfer( - uri, - headers, - [](std::exception_ptr ex) { - if (ex) { - std::rethrow_exception(ex); - } - }, - [_state](std::string_view data) { - _state->lock()->data.append(data); - }, - std::move(data), - noBody - ); - - return std::async( - std::launch::deferred, - [_state, _meta{std::move(meta)}, done{std::move(done)}]() mutable { - auto meta = _meta.get(); - done.get(); - auto state(_state->lock()); - return std::pair(std::move(meta), std::move(state->data)); - } - ); - } - - std::pair, std::future> enqueueFileTransfer( - const std::string & uri, - const Headers & headers, - std::invocable auto doneCallback, - std::function dataCallback, - std::optional data, - bool noBody - ) { /* Ugly hack to support s3:// URIs. */ if (uri.starts_with("s3://")) { // FIXME: do this on a worker thread - return { - std::async( - std::launch::deferred, - [uri, dataCallback] { #if ENABLE_S3 - auto [bucketName, key, params] = parseS3Uri(uri); + auto [bucketName, key, params] = parseS3Uri(uri); - std::string profile = getOr(params, "profile", ""); - std::string region = getOr(params, "region", Aws::Region::US_EAST_1); - std::string scheme = getOr(params, "scheme", ""); - std::string endpoint = getOr(params, "endpoint", ""); + std::string profile = getOr(params, "profile", ""); + std::string region = getOr(params, "region", Aws::Region::US_EAST_1); + std::string scheme = getOr(params, "scheme", ""); + std::string endpoint = getOr(params, "endpoint", ""); - S3Helper s3Helper(profile, region, scheme, endpoint); + S3Helper s3Helper(profile, region, scheme, endpoint); - // FIXME: implement ETag - auto s3Res = s3Helper.getObject(bucketName, key); - FileTransferResult res; - if (!s3Res.data) - throw FileTransferError(NotFound, "S3 object '%s' does not exist", uri); - dataCallback(*s3Res.data); - return res; + // FIXME: implement ETag + auto s3Res = s3Helper.getObject(bucketName, key); + FileTransferResult res; + if (!s3Res.data) + throw FileTransferError(NotFound, "S3 object '%s' does not exist", uri); + return {res, make_box_ptr(std::move(*s3Res.data))}; #else - throw nix::Error( - "cannot download '%s' because Lix is not built with S3 support", uri - ); + throw nix::Error( + "cannot download '%s' because Lix is not built with S3 support", uri + ); #endif - } - ), - std::async(std::launch::deferred, []{}), - }; } - auto item = enqueueItem(std::make_shared( - *this, - uri, - headers, - getCurActivity(), - std::move(doneCallback), - std::move(dataCallback), - std::move(data), - noBody - )); - return {item->metadataPromise.get_future(), item->doneCallback.get_future()}; - } - - bool exists(const std::string & uri, const Headers & headers) override - { - try { - enqueueFileTransfer(uri, headers, std::nullopt, true).get(); - return true; - } catch (FileTransferError & e) { - /* S3 buckets return 403 if a file doesn't exist and the - bucket is unlistable, so treat 403 as 404. */ - if (e.error == FileTransfer::NotFound || e.error == FileTransfer::Forbidden) - return false; - throw; - } - } - - std::pair> - download(const std::string & uri, const Headers & headers) override - { struct State { bool done = false, failed = false; std::exception_ptr exc; @@ -827,9 +747,11 @@ struct curlFileTransfer : public FileTransfer auto _state = std::make_shared>(); - auto [metadataFuture, _done] = enqueueFileTransfer( + auto item = enqueueItem(std::make_shared( + *this, uri, headers, + getCurActivity(), [_state](std::exception_ptr ex) { auto state(_state->lock()); state->done = true; @@ -860,19 +782,19 @@ struct curlFileTransfer : public FileTransfer state->data.append(data); state->avail.notify_one(); }, - std::nullopt, - false - ); + std::move(data), + noBody + )); - struct DownloadSource : Source + struct TransferSource : Source { const std::shared_ptr> _state; std::string chunk; std::string_view buffered; - explicit DownloadSource(const std::shared_ptr> & state) : _state(state) {} + explicit TransferSource(const std::shared_ptr> & state) : _state(state) {} - ~DownloadSource() + ~TransferSource() { // wake up the download thread if it's still going and have it abort auto state(_state->lock()); @@ -927,19 +849,39 @@ struct curlFileTransfer : public FileTransfer } if (total == 0) { - throw EndOfFile("download finished"); + throw EndOfFile("transfer finished"); } return total; } }; - auto metadata = metadataFuture.get(); - auto source = make_box_ptr(_state); + auto metadata = item->metadataPromise.get_future().get(); + auto source = make_box_ptr(_state); auto lock(_state->lock()); source->awaitData(lock); return {std::move(metadata), std::move(source)}; } + + bool exists(const std::string & uri, const Headers & headers) override + { + try { + enqueueFileTransfer(uri, headers, std::nullopt, true); + return true; + } catch (FileTransferError & e) { + /* S3 buckets return 403 if a file doesn't exist and the + bucket is unlistable, so treat 403 as 404. */ + if (e.error == FileTransfer::NotFound || e.error == FileTransfer::Forbidden) + return false; + throw; + } + } + + std::pair> + download(const std::string & uri, const Headers & headers) override + { + return enqueueFileTransfer(uri, headers, std::nullopt, false); + } }; ref makeCurlFileTransfer(std::optional baseRetryTimeMs)