libstore: split callback into metadata and finished parts

this will let us return metadata from FileTransfer::download, which in
turn is necessary to remove enqueueDownload. it also opens avenues for
streaming downloads that keep download metadata instead of dropping it

Change-Id: If0fc6af5eb2aeb689fc866c345c9d7bce4d59f2d
This commit is contained in:
eldritch horrors 2024-10-28 18:59:43 +01:00
parent 923abe347c
commit 14eff10fe4

View file

@ -60,7 +60,8 @@ struct curlFileTransfer : public FileTransfer
/// transfer complete, result or failure reported
transferComplete,
} phase = initialSetup;
std::packaged_task<FileTransferResult(std::exception_ptr, FileTransferResult)> callback;
std::promise<FileTransferResult> metadataPromise;
std::packaged_task<void(std::exception_ptr)> doneCallback;
std::function<void(std::string_view data)> dataCallback;
CURL * req; // must never be nullptr
std::string statusMsg;
@ -100,7 +101,7 @@ struct curlFileTransfer : public FileTransfer
const std::string & uri,
const Headers & headers,
ActivityId parentAct,
std::invocable<std::exception_ptr> auto callback,
std::invocable<std::exception_ptr> auto doneCallback,
std::function<void(std::string_view data)> dataCallback,
std::optional<std::string> uploadData,
bool noBody
@ -112,9 +113,8 @@ struct curlFileTransfer : public FileTransfer
{uri}, parentAct)
, uploadData(std::move(uploadData))
, noBody(noBody)
, callback([cb{std::move(callback)}] (std::exception_ptr ex, FileTransferResult r) {
, doneCallback([cb{std::move(doneCallback)}] (std::exception_ptr ex) {
cb(ex);
return r;
})
, dataCallback(std::move(dataCallback))
, req(curl_easy_init())
@ -143,8 +143,11 @@ struct curlFileTransfer : public FileTransfer
void failEx(std::exception_ptr ex)
{
assert(phase != transferComplete);
if (phase == initialSetup) {
metadataPromise.set_exception(ex);
}
phase = transferComplete;
callback(ex, std::move(result));
doneCallback(ex);
}
template<class T>
@ -178,6 +181,9 @@ struct curlFileTransfer : public FileTransfer
result.cached = getHTTPStatus() == 304;
if (phase == initialSetup) {
metadataPromise.set_value(result);
}
phase = transferring;
}
@ -399,7 +405,7 @@ struct curlFileTransfer : public FileTransfer
{
act.progress(bodySize, bodySize);
phase = transferComplete;
callback(nullptr, std::move(result));
doneCallback(nullptr);
}
else {
@ -718,7 +724,7 @@ struct curlFileTransfer : public FileTransfer
auto _state = std::make_shared<Sync<State>>();
auto transfer = enqueueFileTransfer(
auto [meta, done] = enqueueFileTransfer(
uri,
headers,
[](std::exception_ptr ex) {
@ -733,17 +739,21 @@ struct curlFileTransfer : public FileTransfer
noBody
);
return std::async(std::launch::deferred, [_state, transfer{std::move(transfer)}]() mutable {
auto result = transfer.get();
auto state(_state->lock());
return std::pair(std::move(result), std::move(state->data));
});
return std::async(
std::launch::deferred,
[_state, _meta{std::move(meta)}, done{std::move(done)}]() mutable {
auto meta = _meta.get();
done.get();
auto state(_state->lock());
return std::pair(std::move(meta), std::move(state->data));
}
);
}
std::future<FileTransferResult> enqueueFileTransfer(
std::pair<std::future<FileTransferResult>, std::future<void>> enqueueFileTransfer(
const std::string & uri,
const Headers & headers,
std::invocable<std::exception_ptr> auto callback,
std::invocable<std::exception_ptr> auto doneCallback,
std::function<void(std::string_view data)> dataCallback,
std::optional<std::string> data,
bool noBody
@ -752,46 +762,49 @@ struct curlFileTransfer : public FileTransfer
/* Ugly hack to support s3:// URIs. */
if (uri.starts_with("s3://")) {
// FIXME: do this on a worker thread
return std::async(
std::launch::deferred,
[uri, dataCallback] {
return {
std::async(
std::launch::deferred,
[uri, dataCallback] {
#if ENABLE_S3
auto [bucketName, key, params] = parseS3Uri(uri);
auto [bucketName, key, params] = parseS3Uri(uri);
std::string profile = getOr(params, "profile", "");
std::string region = getOr(params, "region", Aws::Region::US_EAST_1);
std::string scheme = getOr(params, "scheme", "");
std::string endpoint = getOr(params, "endpoint", "");
std::string profile = getOr(params, "profile", "");
std::string region = getOr(params, "region", Aws::Region::US_EAST_1);
std::string scheme = getOr(params, "scheme", "");
std::string endpoint = getOr(params, "endpoint", "");
S3Helper s3Helper(profile, region, scheme, endpoint);
S3Helper s3Helper(profile, region, scheme, endpoint);
// FIXME: implement ETag
auto s3Res = s3Helper.getObject(bucketName, key);
FileTransferResult res;
if (!s3Res.data)
throw FileTransferError(NotFound, "S3 object '%s' does not exist", uri);
dataCallback(*s3Res.data);
return res;
// FIXME: implement ETag
auto s3Res = s3Helper.getObject(bucketName, key);
FileTransferResult res;
if (!s3Res.data)
throw FileTransferError(NotFound, "S3 object '%s' does not exist", uri);
dataCallback(*s3Res.data);
return res;
#else
throw nix::Error(
"cannot download '%s' because Lix is not built with S3 support", uri
);
throw nix::Error(
"cannot download '%s' because Lix is not built with S3 support", uri
);
#endif
}
);
}
),
std::async(std::launch::deferred, []{}),
};
}
return enqueueItem(std::make_shared<TransferItem>(
auto item = enqueueItem(std::make_shared<TransferItem>(
*this,
uri,
headers,
getCurActivity(),
std::move(callback),
std::move(doneCallback),
std::move(dataCallback),
std::move(data),
noBody
))
->callback.get_future();
));
return {item->metadataPromise.get_future(), item->doneCallback.get_future()};
}
bool exists(const std::string & uri, const Headers & headers) override
@ -819,7 +832,7 @@ struct curlFileTransfer : public FileTransfer
auto _state = std::make_shared<Sync<State>>();
enqueueFileTransfer(
auto transfer = enqueueFileTransfer(
uri,
headers,
[_state](std::exception_ptr ex) {