From 14eff10fe454c0a88bd52474ecf608311df34ef4 Mon Sep 17 00:00:00 2001 From: eldritch horrors Date: Mon, 28 Oct 2024 18:59:43 +0100 Subject: [PATCH] libstore: split `callback` into metadata and finished parts this will let us return metadata from FileTransfer::download, which in turn is necessary to remove enqueueDownload. it also opens avenues for streaming downloads that keep download metadata instead of dropping it Change-Id: If0fc6af5eb2aeb689fc866c345c9d7bce4d59f2d --- src/libstore/filetransfer.cc | 93 ++++++++++++++++++++---------------- 1 file changed, 53 insertions(+), 40 deletions(-) diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc index 3f58d147e..73bae6cfe 100644 --- a/src/libstore/filetransfer.cc +++ b/src/libstore/filetransfer.cc @@ -60,7 +60,8 @@ struct curlFileTransfer : public FileTransfer /// transfer complete, result or failure reported transferComplete, } phase = initialSetup; - std::packaged_task callback; + std::promise metadataPromise; + std::packaged_task doneCallback; std::function dataCallback; CURL * req; // must never be nullptr std::string statusMsg; @@ -100,7 +101,7 @@ struct curlFileTransfer : public FileTransfer const std::string & uri, const Headers & headers, ActivityId parentAct, - std::invocable auto callback, + std::invocable auto doneCallback, std::function dataCallback, std::optional uploadData, bool noBody @@ -112,9 +113,8 @@ struct curlFileTransfer : public FileTransfer {uri}, parentAct) , uploadData(std::move(uploadData)) , noBody(noBody) - , callback([cb{std::move(callback)}] (std::exception_ptr ex, FileTransferResult r) { + , doneCallback([cb{std::move(doneCallback)}] (std::exception_ptr ex) { cb(ex); - return r; }) , dataCallback(std::move(dataCallback)) , req(curl_easy_init()) @@ -143,8 +143,11 @@ struct curlFileTransfer : public FileTransfer void failEx(std::exception_ptr ex) { assert(phase != transferComplete); + if (phase == initialSetup) { + metadataPromise.set_exception(ex); + } phase = transferComplete; - callback(ex, std::move(result)); + doneCallback(ex); } template @@ -178,6 +181,9 @@ struct curlFileTransfer : public FileTransfer result.cached = getHTTPStatus() == 304; + if (phase == initialSetup) { + metadataPromise.set_value(result); + } phase = transferring; } @@ -399,7 +405,7 @@ struct curlFileTransfer : public FileTransfer { act.progress(bodySize, bodySize); phase = transferComplete; - callback(nullptr, std::move(result)); + doneCallback(nullptr); } else { @@ -718,7 +724,7 @@ struct curlFileTransfer : public FileTransfer auto _state = std::make_shared>(); - auto transfer = enqueueFileTransfer( + auto [meta, done] = enqueueFileTransfer( uri, headers, [](std::exception_ptr ex) { @@ -733,17 +739,21 @@ struct curlFileTransfer : public FileTransfer noBody ); - return std::async(std::launch::deferred, [_state, transfer{std::move(transfer)}]() mutable { - auto result = transfer.get(); - auto state(_state->lock()); - return std::pair(std::move(result), std::move(state->data)); - }); + return std::async( + std::launch::deferred, + [_state, _meta{std::move(meta)}, done{std::move(done)}]() mutable { + auto meta = _meta.get(); + done.get(); + auto state(_state->lock()); + return std::pair(std::move(meta), std::move(state->data)); + } + ); } - std::future enqueueFileTransfer( + std::pair, std::future> enqueueFileTransfer( const std::string & uri, const Headers & headers, - std::invocable auto callback, + std::invocable auto doneCallback, std::function dataCallback, std::optional data, bool noBody @@ -752,46 +762,49 @@ struct curlFileTransfer : public FileTransfer /* Ugly hack to support s3:// URIs. */ if (uri.starts_with("s3://")) { // FIXME: do this on a worker thread - return std::async( - std::launch::deferred, - [uri, dataCallback] { + return { + std::async( + std::launch::deferred, + [uri, dataCallback] { #if ENABLE_S3 - auto [bucketName, key, params] = parseS3Uri(uri); + auto [bucketName, key, params] = parseS3Uri(uri); - std::string profile = getOr(params, "profile", ""); - std::string region = getOr(params, "region", Aws::Region::US_EAST_1); - std::string scheme = getOr(params, "scheme", ""); - std::string endpoint = getOr(params, "endpoint", ""); + std::string profile = getOr(params, "profile", ""); + std::string region = getOr(params, "region", Aws::Region::US_EAST_1); + std::string scheme = getOr(params, "scheme", ""); + std::string endpoint = getOr(params, "endpoint", ""); - S3Helper s3Helper(profile, region, scheme, endpoint); + S3Helper s3Helper(profile, region, scheme, endpoint); - // FIXME: implement ETag - auto s3Res = s3Helper.getObject(bucketName, key); - FileTransferResult res; - if (!s3Res.data) - throw FileTransferError(NotFound, "S3 object '%s' does not exist", uri); - dataCallback(*s3Res.data); - return res; + // FIXME: implement ETag + auto s3Res = s3Helper.getObject(bucketName, key); + FileTransferResult res; + if (!s3Res.data) + throw FileTransferError(NotFound, "S3 object '%s' does not exist", uri); + dataCallback(*s3Res.data); + return res; #else - throw nix::Error( - "cannot download '%s' because Lix is not built with S3 support", uri - ); + throw nix::Error( + "cannot download '%s' because Lix is not built with S3 support", uri + ); #endif - } - ); + } + ), + std::async(std::launch::deferred, []{}), + }; } - return enqueueItem(std::make_shared( + auto item = enqueueItem(std::make_shared( *this, uri, headers, getCurActivity(), - std::move(callback), + std::move(doneCallback), std::move(dataCallback), std::move(data), noBody - )) - ->callback.get_future(); + )); + return {item->metadataPromise.get_future(), item->doneCallback.get_future()}; } bool exists(const std::string & uri, const Headers & headers) override @@ -819,7 +832,7 @@ struct curlFileTransfer : public FileTransfer auto _state = std::make_shared>(); - enqueueFileTransfer( + auto transfer = enqueueFileTransfer( uri, headers, [_state](std::exception_ptr ex) {