forked from lix-project/lix
filetransfer: don't decompress in curl wrapper itself
only decompress the response once all data has been received (in the
fully buffered case), or at least outside of the curl wrapper itself
(in the receive-to-sink case). unfortunately this means we will have
to duplicate decompression logic for these two cases for time being,
but once the curl wrapper has been rewritten to return a real future
or Source we can deduplicate this logic again. the curl wrapper will
have to turn into a proper Source first and use decompression source
logic which also does not currently exist—only decompression *sinks*
Change-Id: I66bc692f07d9b9e69fe10689ee73a2de8d65e35c
This commit is contained in:
parent
fb0996aaa8
commit
a1ad4e52a6
|
@ -49,7 +49,7 @@ struct curlFileTransfer : public FileTransfer
|
||||||
Activity act;
|
Activity act;
|
||||||
bool done = false; // whether either the success or failure function has been called
|
bool done = false; // whether either the success or failure function has been called
|
||||||
Callback<FileTransferResult> callback;
|
Callback<FileTransferResult> callback;
|
||||||
std::function<void(std::string_view data)> dataCallback;
|
std::function<void(TransferItem &, std::string_view data)> dataCallback;
|
||||||
CURL * req = 0;
|
CURL * req = 0;
|
||||||
bool active = false; // whether the handle has been added to the multi object
|
bool active = false; // whether the handle has been added to the multi object
|
||||||
bool headersProcessed = false;
|
bool headersProcessed = false;
|
||||||
|
@ -84,7 +84,7 @@ struct curlFileTransfer : public FileTransfer
|
||||||
TransferItem(curlFileTransfer & fileTransfer,
|
TransferItem(curlFileTransfer & fileTransfer,
|
||||||
const FileTransferRequest & request,
|
const FileTransferRequest & request,
|
||||||
Callback<FileTransferResult> && callback,
|
Callback<FileTransferResult> && callback,
|
||||||
std::function<void(std::string_view data)> dataCallback)
|
std::function<void(TransferItem &, std::string_view data)> dataCallback)
|
||||||
: fileTransfer(fileTransfer)
|
: fileTransfer(fileTransfer)
|
||||||
, request(request)
|
, request(request)
|
||||||
, act(*logger, lvlTalkative, actFileTransfer,
|
, act(*logger, lvlTalkative, actFileTransfer,
|
||||||
|
@ -92,16 +92,6 @@ struct curlFileTransfer : public FileTransfer
|
||||||
{request.uri}, request.parentAct)
|
{request.uri}, request.parentAct)
|
||||||
, callback(std::move(callback))
|
, callback(std::move(callback))
|
||||||
, dataCallback(std::move(dataCallback))
|
, dataCallback(std::move(dataCallback))
|
||||||
, finalSink([this](std::string_view data) {
|
|
||||||
auto httpStatus = getHTTPStatus();
|
|
||||||
/* Only write data to the sink if this is a
|
|
||||||
successful response. */
|
|
||||||
if (successfulStatuses.count(httpStatus) && this->dataCallback) {
|
|
||||||
writtenToSink += data.size();
|
|
||||||
this->dataCallback(data);
|
|
||||||
} else
|
|
||||||
this->result.data.append(data);
|
|
||||||
})
|
|
||||||
{
|
{
|
||||||
requestHeaders = curl_slist_append(requestHeaders, "Accept-Encoding: zstd, br, gzip, deflate, bzip2, xz");
|
requestHeaders = curl_slist_append(requestHeaders, "Accept-Encoding: zstd, br, gzip, deflate, bzip2, xz");
|
||||||
if (!request.expectedETag.empty())
|
if (!request.expectedETag.empty())
|
||||||
|
@ -142,9 +132,6 @@ struct curlFileTransfer : public FileTransfer
|
||||||
failEx(std::make_exception_ptr(std::forward<T>(e)));
|
failEx(std::make_exception_ptr(std::forward<T>(e)));
|
||||||
}
|
}
|
||||||
|
|
||||||
LambdaSink finalSink;
|
|
||||||
std::shared_ptr<FinishSink> decompressionSink;
|
|
||||||
|
|
||||||
std::exception_ptr writeException;
|
std::exception_ptr writeException;
|
||||||
|
|
||||||
std::optional<std::string> getHeader(const char * name)
|
std::optional<std::string> getHeader(const char * name)
|
||||||
|
@ -177,12 +164,13 @@ struct curlFileTransfer : public FileTransfer
|
||||||
size_t realSize = size * nmemb;
|
size_t realSize = size * nmemb;
|
||||||
result.bodySize += realSize;
|
result.bodySize += realSize;
|
||||||
|
|
||||||
if (!decompressionSink) {
|
if (successfulStatuses.count(getHTTPStatus()) && this->dataCallback) {
|
||||||
decompressionSink = makeDecompressionSink(encoding, finalSink);
|
writtenToSink += realSize;
|
||||||
|
dataCallback(*this, {(const char *) contents, realSize});
|
||||||
|
} else {
|
||||||
|
this->result.data.append((const char *) contents, realSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
(*decompressionSink)({(char *) contents, realSize});
|
|
||||||
|
|
||||||
return realSize;
|
return realSize;
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
writeException = std::current_exception();
|
writeException = std::current_exception();
|
||||||
|
@ -345,14 +333,6 @@ struct curlFileTransfer : public FileTransfer
|
||||||
debug("finished %s of '%s'; curl status = %d, HTTP status = %d, body = %d bytes",
|
debug("finished %s of '%s'; curl status = %d, HTTP status = %d, body = %d bytes",
|
||||||
request.verb(), request.uri, code, httpStatus, result.bodySize);
|
request.verb(), request.uri, code, httpStatus, result.bodySize);
|
||||||
|
|
||||||
if (decompressionSink) {
|
|
||||||
try {
|
|
||||||
decompressionSink->finish();
|
|
||||||
} catch (...) {
|
|
||||||
writeException = std::current_exception();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
auto link = getHeader("link");
|
auto link = getHeader("link");
|
||||||
if (!link) {
|
if (!link) {
|
||||||
link = getHeader("x-amz-meta-link");
|
link = getHeader("x-amz-meta-link");
|
||||||
|
@ -372,6 +352,14 @@ struct curlFileTransfer : public FileTransfer
|
||||||
result.etag = std::move(*etag);
|
result.etag = std::move(*etag);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// this has to happen here until we can return an actual future.
|
||||||
|
// wrapping user `callback`s instead is not possible because the
|
||||||
|
// Callback api expects std::functions, and copying Callbacks is
|
||||||
|
// not possible due the promises they hold.
|
||||||
|
if (code == CURLE_OK && !dataCallback) {
|
||||||
|
result.data = decompress(encoding, result.data);
|
||||||
|
}
|
||||||
|
|
||||||
if (writeException)
|
if (writeException)
|
||||||
failEx(writeException);
|
failEx(writeException);
|
||||||
|
|
||||||
|
@ -674,7 +662,7 @@ struct curlFileTransfer : public FileTransfer
|
||||||
|
|
||||||
void enqueueFileTransfer(const FileTransferRequest & request,
|
void enqueueFileTransfer(const FileTransferRequest & request,
|
||||||
Callback<FileTransferResult> callback,
|
Callback<FileTransferResult> callback,
|
||||||
std::function<void(std::string_view data)> dataCallback)
|
std::function<void(TransferItem &, std::string_view data)> dataCallback)
|
||||||
{
|
{
|
||||||
/* Ugly hack to support s3:// URIs. */
|
/* Ugly hack to support s3:// URIs. */
|
||||||
if (request.uri.starts_with("s3://")) {
|
if (request.uri.starts_with("s3://")) {
|
||||||
|
@ -724,6 +712,7 @@ struct curlFileTransfer : public FileTransfer
|
||||||
std::exception_ptr exc;
|
std::exception_ptr exc;
|
||||||
std::string data;
|
std::string data;
|
||||||
std::condition_variable avail, request;
|
std::condition_variable avail, request;
|
||||||
|
std::unique_ptr<FinishSink> decompressor;
|
||||||
};
|
};
|
||||||
|
|
||||||
auto _state = std::make_shared<Sync<State>>();
|
auto _state = std::make_shared<Sync<State>>();
|
||||||
|
@ -748,11 +737,15 @@ struct curlFileTransfer : public FileTransfer
|
||||||
state->avail.notify_one();
|
state->avail.notify_one();
|
||||||
state->request.notify_one();
|
state->request.notify_one();
|
||||||
}},
|
}},
|
||||||
[_state](std::string_view data) {
|
[_state, &sink](TransferItem & transfer, std::string_view data) {
|
||||||
auto state(_state->lock());
|
auto state(_state->lock());
|
||||||
|
|
||||||
if (state->quit) return;
|
if (state->quit) return;
|
||||||
|
|
||||||
|
if (!state->decompressor) {
|
||||||
|
state->decompressor = makeDecompressionSink(transfer.encoding, sink);
|
||||||
|
}
|
||||||
|
|
||||||
/* If the buffer is full, then go to sleep until the calling
|
/* If the buffer is full, then go to sleep until the calling
|
||||||
thread wakes us up (i.e. when it has removed data from the
|
thread wakes us up (i.e. when it has removed data from the
|
||||||
buffer). We don't wait forever to prevent stalling the
|
buffer). We don't wait forever to prevent stalling the
|
||||||
|
@ -773,6 +766,7 @@ struct curlFileTransfer : public FileTransfer
|
||||||
checkInterrupt();
|
checkInterrupt();
|
||||||
|
|
||||||
std::string chunk;
|
std::string chunk;
|
||||||
|
FinishSink * sink = nullptr;
|
||||||
|
|
||||||
/* Grab data if available, otherwise wait for the download
|
/* Grab data if available, otherwise wait for the download
|
||||||
thread to wake us up. */
|
thread to wake us up. */
|
||||||
|
@ -783,6 +777,9 @@ struct curlFileTransfer : public FileTransfer
|
||||||
|
|
||||||
if (state->quit) {
|
if (state->quit) {
|
||||||
if (state->exc) std::rethrow_exception(state->exc);
|
if (state->exc) std::rethrow_exception(state->exc);
|
||||||
|
if (state->decompressor) {
|
||||||
|
state->decompressor->finish();
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -792,6 +789,7 @@ struct curlFileTransfer : public FileTransfer
|
||||||
}
|
}
|
||||||
|
|
||||||
chunk = std::move(state->data);
|
chunk = std::move(state->data);
|
||||||
|
sink = state->decompressor.get();
|
||||||
/* Reset state->data after the move, since we check data.empty() */
|
/* Reset state->data after the move, since we check data.empty() */
|
||||||
state->data = "";
|
state->data = "";
|
||||||
|
|
||||||
|
@ -802,7 +800,7 @@ struct curlFileTransfer : public FileTransfer
|
||||||
if it's blocked on a full buffer. We don't hold the state
|
if it's blocked on a full buffer. We don't hold the state
|
||||||
lock while doing this to prevent blocking the download
|
lock while doing this to prevent blocking the download
|
||||||
thread if sink() takes a long time. */
|
thread if sink() takes a long time. */
|
||||||
sink(chunk);
|
(*sink)(chunk);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
Loading…
Reference in a new issue