libstore: simplify download buffer handling

keep the download buffer in TransferItem. keep only *one* download
buffer (not one for success returns and one for failures). we also
remove the asynchronous callbacks since they are no longer needed.

Change-Id: I9b1c1e4c59d5333602aea1c36e477c481eaaf98e
This commit is contained in:
eldritch horrors 2024-11-12 14:01:46 +01:00
parent 7d7e16ecec
commit 43777939eb

View file

@ -42,12 +42,20 @@ struct curlFileTransfer : public FileTransfer
struct TransferItem : public std::enable_shared_from_this<TransferItem> struct TransferItem : public std::enable_shared_from_this<TransferItem>
{ {
struct DownloadState
{
bool done = false;
std::exception_ptr exc;
std::string data;
};
curlFileTransfer & fileTransfer; curlFileTransfer & fileTransfer;
std::string uri; std::string uri;
FileTransferResult result; FileTransferResult result;
Activity act; Activity act;
std::unique_ptr<FILE, decltype([](FILE * f) { fclose(f); })> uploadData; std::unique_ptr<FILE, decltype([](FILE * f) { fclose(f); })> uploadData;
std::string downloadData; Sync<DownloadState> downloadState;
std::condition_variable downloadEvent;
enum { enum {
/// nothing has been transferred yet /// nothing has been transferred yet
initialSetup, initialSetup,
@ -57,9 +65,6 @@ struct curlFileTransfer : public FileTransfer
transferComplete, transferComplete,
} phase = initialSetup; } phase = initialSetup;
std::promise<FileTransferResult> metadataPromise; std::promise<FileTransferResult> metadataPromise;
std::packaged_task<void(std::exception_ptr)> doneCallback;
// return false from dataCallback to pause the transfer without consuming data
std::function<bool(std::string_view data)> dataCallback;
CURL * req; // must never be nullptr CURL * req; // must never be nullptr
std::string statusMsg; std::string statusMsg;
@ -88,8 +93,6 @@ struct curlFileTransfer : public FileTransfer
const std::string & uri, const std::string & uri,
const Headers & headers, const Headers & headers,
ActivityId parentAct, ActivityId parentAct,
std::invocable<std::exception_ptr> auto doneCallback,
std::function<bool(std::string_view data)> dataCallback,
std::optional<std::string_view> uploadData, std::optional<std::string_view> uploadData,
bool noBody, bool noBody,
curl_off_t writtenToSink curl_off_t writtenToSink
@ -99,10 +102,6 @@ struct curlFileTransfer : public FileTransfer
, act(*logger, lvlTalkative, actFileTransfer, , act(*logger, lvlTalkative, actFileTransfer,
fmt(uploadData ? "uploading '%s'" : "downloading '%s'", uri), fmt(uploadData ? "uploading '%s'" : "downloading '%s'", uri),
{uri}, parentAct) {uri}, parentAct)
, doneCallback([cb{std::move(doneCallback)}] (std::exception_ptr ex) {
cb(ex);
})
, dataCallback(std::move(dataCallback))
, req(curl_easy_init()) , req(curl_easy_init())
{ {
if (req == nullptr) { if (req == nullptr) {
@ -204,7 +203,8 @@ struct curlFileTransfer : public FileTransfer
metadataPromise.set_exception(ex); metadataPromise.set_exception(ex);
} }
phase = transferComplete; phase = transferComplete;
doneCallback(ex); downloadState.lock()->exc = ex;
downloadEvent.notify_all();
} }
template<class T> template<class T>
@ -242,14 +242,16 @@ struct curlFileTransfer : public FileTransfer
try { try {
maybeFinishSetup(); maybeFinishSetup();
if (successfulStatuses.count(getHTTPStatus()) && this->dataCallback) { auto state = downloadState.lock();
if (!dataCallback({static_cast<const char *>(contents), realSize})) {
// when the buffer is full (as determined by a historical magic value) we
// pause the transfer and wait for the receiver to unpause it when ready.
if (successfulStatuses.count(getHTTPStatus()) && state->data.size() > 1024 * 1024) {
return CURL_WRITEFUNC_PAUSE; return CURL_WRITEFUNC_PAUSE;
} }
} else {
this->downloadData.append(static_cast<const char *>(contents), realSize);
}
state->data.append(static_cast<const char *>(contents), realSize);
downloadEvent.notify_all();
bodySize += realSize; bodySize += realSize;
return realSize; return realSize;
} catch (...) { } catch (...) {
@ -343,7 +345,8 @@ struct curlFileTransfer : public FileTransfer
{ {
act.progress(bodySize, bodySize); act.progress(bodySize, bodySize);
phase = transferComplete; phase = transferComplete;
doneCallback(nullptr); downloadState.lock()->done = true;
downloadEvent.notify_all();
} }
else { else {
@ -398,7 +401,7 @@ struct curlFileTransfer : public FileTransfer
std::optional<std::string> response; std::optional<std::string> response;
if (!successfulStatuses.count(httpStatus)) if (!successfulStatuses.count(httpStatus))
response = std::move(downloadData); response = std::move(downloadState.lock()->data);
auto exc = auto exc =
code == CURLE_ABORTED_BY_CALLBACK && _isInterrupted code == CURLE_ABORTED_BY_CALLBACK && _isInterrupted
? FileTransferError(Interrupted, std::move(response), "%s of '%s' was interrupted", verb(), uri) ? FileTransferError(Interrupted, std::move(response), "%s of '%s' was interrupted", verb(), uri)
@ -742,13 +745,6 @@ struct curlFileTransfer : public FileTransfer
struct TransferSource : Source struct TransferSource : Source
{ {
struct State {
bool done = false;
std::exception_ptr exc;
std::string data;
std::condition_variable avail;
};
curlFileTransfer & parent; curlFileTransfer & parent;
std::string uri; std::string uri;
Headers headers; Headers headers;
@ -756,7 +752,6 @@ struct curlFileTransfer : public FileTransfer
bool noBody; bool noBody;
ActivityId parentAct = getCurActivity(); ActivityId parentAct = getCurActivity();
const std::shared_ptr<Sync<State>> _state = std::make_shared<Sync<State>>();
std::shared_ptr<TransferItem> transfer; std::shared_ptr<TransferItem> transfer;
FileTransferResult metadata; FileTransferResult metadata;
std::string chunk; std::string chunk;
@ -820,38 +815,9 @@ struct curlFileTransfer : public FileTransfer
FileTransferResult startTransfer(const std::string & uri, curl_off_t offset = 0) FileTransferResult startTransfer(const std::string & uri, curl_off_t offset = 0)
{ {
attempt += 1; attempt += 1;
auto uploadData = data ? std::optional(std::string_view(*data)) : std::nullopt;
transfer = std::make_shared<TransferItem>( transfer = std::make_shared<TransferItem>(
parent, parent, uri, headers, parentAct, uploadData, noBody, offset
uri,
headers,
parentAct,
[_state{_state}](std::exception_ptr ex) {
auto state(_state->lock());
state->done = ex == nullptr;
state->exc = ex;
state->avail.notify_one();
},
[_state{_state}](std::string_view data) {
auto state(_state->lock());
/* If the buffer is full, then go to sleep until the calling
thread wakes us up (i.e. when it has removed data from the
buffer). We don't wait forever to prevent stalling the
download thread. (Hopefully sleeping will throttle the
sender.) */
if (state->data.size() > 1024 * 1024) {
return false;
}
/* Append data to the buffer and wake up the calling
thread. */
state->data.append(data);
state->avail.notify_one();
return true;
},
data ? std::optional(std::string_view(*data)) : std::nullopt,
noBody,
offset
); );
parent.enqueueItem(transfer); parent.enqueueItem(transfer);
return transfer->metadataPromise.get_future().get(); return transfer->metadataPromise.get_future().get();
@ -868,7 +834,7 @@ struct curlFileTransfer : public FileTransfer
bool attemptRetry(const std::string & context) bool attemptRetry(const std::string & context)
{ {
auto state(_state->lock()); auto state(transfer->downloadState.lock());
assert(state->data.empty()); assert(state->data.empty());
@ -907,7 +873,7 @@ struct curlFileTransfer : public FileTransfer
/* Grab data if available, otherwise wait for the download /* Grab data if available, otherwise wait for the download
thread to wake us up. */ thread to wake us up. */
while (buffered.empty()) { while (buffered.empty()) {
auto state(_state->lock()); auto state(transfer->downloadState.lock());
if (state->data.empty()) { if (state->data.empty()) {
if (state->exc) { if (state->exc) {
@ -917,7 +883,7 @@ struct curlFileTransfer : public FileTransfer
} }
transfer->unpause(); transfer->unpause();
state.wait(state->avail); state.wait(transfer->downloadEvent);
} }
chunk = std::move(state->data); chunk = std::move(state->data);