libstore: reunify all file transfer methods again

with the api cleaned up we can suddenly reunify uploads, downloads, and
existence checks through curl in the same wrapper function. uploads and
existence checks simply don't use the result source, and given that all
transfers (or at least *most* transfers to date) go through the network
the few extra allocations do not hurt us at all. even for file:// calls
the overhead won't much matter as going to disk and back *is* expensive

Change-Id: I4f9ca6681a8fc303377b4cf4c63e3363ae32c18b
This commit is contained in:
eldritch horrors 2024-10-28 18:59:43 +01:00
parent d65838a900
commit c83b13eafd

View file

@ -702,122 +702,42 @@ struct curlFileTransfer : public FileTransfer
void upload(const std::string & uri, std::string data, const Headers & headers) override void upload(const std::string & uri, std::string data, const Headers & headers) override
{ {
enqueueFileTransfer(uri, headers, std::move(data), false).get(); enqueueFileTransfer(uri, headers, std::move(data), false);
} }
std::future<std::pair<FileTransferResult, std::string>> enqueueFileTransfer( std::pair<FileTransferResult, box_ptr<Source>> enqueueFileTransfer(
const std::string & uri, const std::string & uri,
const Headers & headers, const Headers & headers,
std::optional<std::string> data, std::optional<std::string> data,
bool noBody bool noBody
) )
{
struct State {
std::string data;
};
auto _state = std::make_shared<Sync<State>>();
auto [meta, done] = enqueueFileTransfer(
uri,
headers,
[](std::exception_ptr ex) {
if (ex) {
std::rethrow_exception(ex);
}
},
[_state](std::string_view data) {
_state->lock()->data.append(data);
},
std::move(data),
noBody
);
return std::async(
std::launch::deferred,
[_state, _meta{std::move(meta)}, done{std::move(done)}]() mutable {
auto meta = _meta.get();
done.get();
auto state(_state->lock());
return std::pair(std::move(meta), std::move(state->data));
}
);
}
std::pair<std::future<FileTransferResult>, std::future<void>> enqueueFileTransfer(
const std::string & uri,
const Headers & headers,
std::invocable<std::exception_ptr> auto doneCallback,
std::function<void(std::string_view data)> dataCallback,
std::optional<std::string> data,
bool noBody
)
{ {
/* Ugly hack to support s3:// URIs. */ /* Ugly hack to support s3:// URIs. */
if (uri.starts_with("s3://")) { if (uri.starts_with("s3://")) {
// FIXME: do this on a worker thread // FIXME: do this on a worker thread
return {
std::async(
std::launch::deferred,
[uri, dataCallback] {
#if ENABLE_S3 #if ENABLE_S3
auto [bucketName, key, params] = parseS3Uri(uri); auto [bucketName, key, params] = parseS3Uri(uri);
std::string profile = getOr(params, "profile", ""); std::string profile = getOr(params, "profile", "");
std::string region = getOr(params, "region", Aws::Region::US_EAST_1); std::string region = getOr(params, "region", Aws::Region::US_EAST_1);
std::string scheme = getOr(params, "scheme", ""); std::string scheme = getOr(params, "scheme", "");
std::string endpoint = getOr(params, "endpoint", ""); std::string endpoint = getOr(params, "endpoint", "");
S3Helper s3Helper(profile, region, scheme, endpoint); S3Helper s3Helper(profile, region, scheme, endpoint);
// FIXME: implement ETag // FIXME: implement ETag
auto s3Res = s3Helper.getObject(bucketName, key); auto s3Res = s3Helper.getObject(bucketName, key);
FileTransferResult res; FileTransferResult res;
if (!s3Res.data) if (!s3Res.data)
throw FileTransferError(NotFound, "S3 object '%s' does not exist", uri); throw FileTransferError(NotFound, "S3 object '%s' does not exist", uri);
dataCallback(*s3Res.data); return {res, make_box_ptr<StringSource>(std::move(*s3Res.data))};
return res;
#else #else
throw nix::Error( throw nix::Error(
"cannot download '%s' because Lix is not built with S3 support", uri "cannot download '%s' because Lix is not built with S3 support", uri
); );
#endif #endif
}
),
std::async(std::launch::deferred, []{}),
};
} }
auto item = enqueueItem(std::make_shared<TransferItem>(
*this,
uri,
headers,
getCurActivity(),
std::move(doneCallback),
std::move(dataCallback),
std::move(data),
noBody
));
return {item->metadataPromise.get_future(), item->doneCallback.get_future()};
}
bool exists(const std::string & uri, const Headers & headers) override
{
try {
enqueueFileTransfer(uri, headers, std::nullopt, true).get();
return true;
} catch (FileTransferError & e) {
/* S3 buckets return 403 if a file doesn't exist and the
bucket is unlistable, so treat 403 as 404. */
if (e.error == FileTransfer::NotFound || e.error == FileTransfer::Forbidden)
return false;
throw;
}
}
std::pair<FileTransferResult, box_ptr<Source>>
download(const std::string & uri, const Headers & headers) override
{
struct State { struct State {
bool done = false, failed = false; bool done = false, failed = false;
std::exception_ptr exc; std::exception_ptr exc;
@ -827,9 +747,11 @@ struct curlFileTransfer : public FileTransfer
auto _state = std::make_shared<Sync<State>>(); auto _state = std::make_shared<Sync<State>>();
auto [metadataFuture, _done] = enqueueFileTransfer( auto item = enqueueItem(std::make_shared<TransferItem>(
*this,
uri, uri,
headers, headers,
getCurActivity(),
[_state](std::exception_ptr ex) { [_state](std::exception_ptr ex) {
auto state(_state->lock()); auto state(_state->lock());
state->done = true; state->done = true;
@ -860,19 +782,19 @@ struct curlFileTransfer : public FileTransfer
state->data.append(data); state->data.append(data);
state->avail.notify_one(); state->avail.notify_one();
}, },
std::nullopt, std::move(data),
false noBody
); ));
struct DownloadSource : Source struct TransferSource : Source
{ {
const std::shared_ptr<Sync<State>> _state; const std::shared_ptr<Sync<State>> _state;
std::string chunk; std::string chunk;
std::string_view buffered; std::string_view buffered;
explicit DownloadSource(const std::shared_ptr<Sync<State>> & state) : _state(state) {} explicit TransferSource(const std::shared_ptr<Sync<State>> & state) : _state(state) {}
~DownloadSource() ~TransferSource()
{ {
// wake up the download thread if it's still going and have it abort // wake up the download thread if it's still going and have it abort
auto state(_state->lock()); auto state(_state->lock());
@ -927,19 +849,39 @@ struct curlFileTransfer : public FileTransfer
} }
if (total == 0) { if (total == 0) {
throw EndOfFile("download finished"); throw EndOfFile("transfer finished");
} }
return total; return total;
} }
}; };
auto metadata = metadataFuture.get(); auto metadata = item->metadataPromise.get_future().get();
auto source = make_box_ptr<DownloadSource>(_state); auto source = make_box_ptr<TransferSource>(_state);
auto lock(_state->lock()); auto lock(_state->lock());
source->awaitData(lock); source->awaitData(lock);
return {std::move(metadata), std::move(source)}; return {std::move(metadata), std::move(source)};
} }
bool exists(const std::string & uri, const Headers & headers) override
{
try {
enqueueFileTransfer(uri, headers, std::nullopt, true);
return true;
} catch (FileTransferError & e) {
/* S3 buckets return 403 if a file doesn't exist and the
bucket is unlistable, so treat 403 as 404. */
if (e.error == FileTransfer::NotFound || e.error == FileTransfer::Forbidden)
return false;
throw;
}
}
std::pair<FileTransferResult, box_ptr<Source>>
download(const std::string & uri, const Headers & headers) override
{
return enqueueFileTransfer(uri, headers, std::nullopt, false);
}
}; };
ref<curlFileTransfer> makeCurlFileTransfer(std::optional<unsigned int> baseRetryTimeMs) ref<curlFileTransfer> makeCurlFileTransfer(std::optional<unsigned int> baseRetryTimeMs)