forked from lix-project/lix
filetransfer: remove dataCallback from interface
this is highly questionable. single-arg download calls will misbehave
with it set, and two-arg download calls will just overwrite it. being
an implementation detail this should not have been in the API at all.
Change-Id: I613772951ee03d8302366085f06a53601d13f132
This commit is contained in:
parent
e3802c9723
commit
5216ff2745
2 changed files with 38 additions and 29 deletions
|
@ -49,6 +49,7 @@ struct curlFileTransfer : public FileTransfer
|
||||||
Activity act;
|
Activity act;
|
||||||
bool done = false; // whether either the success or failure function has been called
|
bool done = false; // whether either the success or failure function has been called
|
||||||
Callback<FileTransferResult> callback;
|
Callback<FileTransferResult> callback;
|
||||||
|
std::function<void(std::string_view data)> dataCallback;
|
||||||
CURL * req = 0;
|
CURL * req = 0;
|
||||||
bool active = false; // whether the handle has been added to the multi object
|
bool active = false; // whether the handle has been added to the multi object
|
||||||
bool headersProcessed = false;
|
bool headersProcessed = false;
|
||||||
|
@ -82,20 +83,22 @@ struct curlFileTransfer : public FileTransfer
|
||||||
|
|
||||||
TransferItem(curlFileTransfer & fileTransfer,
|
TransferItem(curlFileTransfer & fileTransfer,
|
||||||
const FileTransferRequest & request,
|
const FileTransferRequest & request,
|
||||||
Callback<FileTransferResult> && callback)
|
Callback<FileTransferResult> && callback,
|
||||||
|
std::function<void(std::string_view data)> dataCallback)
|
||||||
: fileTransfer(fileTransfer)
|
: fileTransfer(fileTransfer)
|
||||||
, request(request)
|
, request(request)
|
||||||
, act(*logger, lvlTalkative, actFileTransfer,
|
, act(*logger, lvlTalkative, actFileTransfer,
|
||||||
fmt(request.data ? "uploading '%s'" : "downloading '%s'", request.uri),
|
fmt(request.data ? "uploading '%s'" : "downloading '%s'", request.uri),
|
||||||
{request.uri}, request.parentAct)
|
{request.uri}, request.parentAct)
|
||||||
, callback(std::move(callback))
|
, callback(std::move(callback))
|
||||||
|
, dataCallback(std::move(dataCallback))
|
||||||
, finalSink([this](std::string_view data) {
|
, finalSink([this](std::string_view data) {
|
||||||
auto httpStatus = getHTTPStatus();
|
auto httpStatus = getHTTPStatus();
|
||||||
/* Only write data to the sink if this is a
|
/* Only write data to the sink if this is a
|
||||||
successful response. */
|
successful response. */
|
||||||
if (successfulStatuses.count(httpStatus) && this->request.dataCallback) {
|
if (successfulStatuses.count(httpStatus) && this->dataCallback) {
|
||||||
writtenToSink += data.size();
|
writtenToSink += data.size();
|
||||||
this->request.dataCallback(data);
|
this->dataCallback(data);
|
||||||
} else
|
} else
|
||||||
this->result.data.append(data);
|
this->result.data.append(data);
|
||||||
})
|
})
|
||||||
|
@ -455,7 +458,7 @@ struct curlFileTransfer : public FileTransfer
|
||||||
ranged requests. */
|
ranged requests. */
|
||||||
if (err == Transient
|
if (err == Transient
|
||||||
&& attempt < request.tries
|
&& attempt < request.tries
|
||||||
&& (!this->request.dataCallback
|
&& (!this->dataCallback
|
||||||
|| writtenToSink == 0
|
|| writtenToSink == 0
|
||||||
|| (acceptRanges && encoding.empty())))
|
|| (acceptRanges && encoding.empty())))
|
||||||
{
|
{
|
||||||
|
@ -665,6 +668,13 @@ struct curlFileTransfer : public FileTransfer
|
||||||
|
|
||||||
void enqueueFileTransfer(const FileTransferRequest & request,
|
void enqueueFileTransfer(const FileTransferRequest & request,
|
||||||
Callback<FileTransferResult> callback) override
|
Callback<FileTransferResult> callback) override
|
||||||
|
{
|
||||||
|
enqueueFileTransfer(request, std::move(callback), {});
|
||||||
|
}
|
||||||
|
|
||||||
|
void enqueueFileTransfer(const FileTransferRequest & request,
|
||||||
|
Callback<FileTransferResult> callback,
|
||||||
|
std::function<void(std::string_view data)> dataCallback)
|
||||||
{
|
{
|
||||||
/* Ugly hack to support s3:// URIs. */
|
/* Ugly hack to support s3:// URIs. */
|
||||||
if (request.uri.starts_with("s3://")) {
|
if (request.uri.starts_with("s3://")) {
|
||||||
|
@ -694,7 +704,9 @@ struct curlFileTransfer : public FileTransfer
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
enqueueItem(std::make_shared<TransferItem>(*this, request, std::move(callback)));
|
enqueueItem(std::make_shared<TransferItem>(
|
||||||
|
*this, request, std::move(callback), std::move(dataCallback)
|
||||||
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
void download(FileTransferRequest && request, Sink & sink) override
|
void download(FileTransferRequest && request, Sink & sink) override
|
||||||
|
@ -724,28 +736,6 @@ struct curlFileTransfer : public FileTransfer
|
||||||
state->request.notify_one();
|
state->request.notify_one();
|
||||||
});
|
});
|
||||||
|
|
||||||
request.dataCallback = [_state](std::string_view data) {
|
|
||||||
|
|
||||||
auto state(_state->lock());
|
|
||||||
|
|
||||||
if (state->quit) return;
|
|
||||||
|
|
||||||
/* If the buffer is full, then go to sleep until the calling
|
|
||||||
thread wakes us up (i.e. when it has removed data from the
|
|
||||||
buffer). We don't wait forever to prevent stalling the
|
|
||||||
download thread. (Hopefully sleeping will throttle the
|
|
||||||
sender.) */
|
|
||||||
if (state->data.size() > 1024 * 1024) {
|
|
||||||
debug("download buffer is full; going to sleep");
|
|
||||||
state.wait_for(state->request, std::chrono::seconds(10));
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Append data to the buffer and wake up the calling
|
|
||||||
thread. */
|
|
||||||
state->data.append(data);
|
|
||||||
state->avail.notify_one();
|
|
||||||
};
|
|
||||||
|
|
||||||
enqueueFileTransfer(request,
|
enqueueFileTransfer(request,
|
||||||
{[_state](std::future<FileTransferResult> fut) {
|
{[_state](std::future<FileTransferResult> fut) {
|
||||||
auto state(_state->lock());
|
auto state(_state->lock());
|
||||||
|
@ -757,7 +747,27 @@ struct curlFileTransfer : public FileTransfer
|
||||||
}
|
}
|
||||||
state->avail.notify_one();
|
state->avail.notify_one();
|
||||||
state->request.notify_one();
|
state->request.notify_one();
|
||||||
}});
|
}},
|
||||||
|
[_state](std::string_view data) {
|
||||||
|
auto state(_state->lock());
|
||||||
|
|
||||||
|
if (state->quit) return;
|
||||||
|
|
||||||
|
/* If the buffer is full, then go to sleep until the calling
|
||||||
|
thread wakes us up (i.e. when it has removed data from the
|
||||||
|
buffer). We don't wait forever to prevent stalling the
|
||||||
|
download thread. (Hopefully sleeping will throttle the
|
||||||
|
sender.) */
|
||||||
|
if (state->data.size() > 1024 * 1024) {
|
||||||
|
debug("download buffer is full; going to sleep");
|
||||||
|
state.wait_for(state->request, std::chrono::seconds(10));
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Append data to the buffer and wake up the calling
|
||||||
|
thread. */
|
||||||
|
state->data.append(data);
|
||||||
|
state->avail.notify_one();
|
||||||
|
});
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
checkInterrupt();
|
checkInterrupt();
|
||||||
|
|
|
@ -61,7 +61,6 @@ struct FileTransferRequest
|
||||||
ActivityId parentAct;
|
ActivityId parentAct;
|
||||||
std::optional<std::string> data;
|
std::optional<std::string> data;
|
||||||
std::string mimeType;
|
std::string mimeType;
|
||||||
std::function<void(std::string_view data)> dataCallback;
|
|
||||||
|
|
||||||
FileTransferRequest(std::string_view uri)
|
FileTransferRequest(std::string_view uri)
|
||||||
: uri(uri), parentAct(getCurActivity()) { }
|
: uri(uri), parentAct(getCurActivity()) { }
|
||||||
|
|
Loading…
Reference in a new issue