diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc index aa293d2bd..c9f5d6260 100644 --- a/src/libstore/filetransfer.cc +++ b/src/libstore/filetransfer.cc @@ -49,7 +49,7 @@ struct curlFileTransfer : public FileTransfer Activity act; bool done = false; // whether either the success or failure function has been called Callback callback; - std::function dataCallback; + std::function dataCallback; CURL * req = 0; bool active = false; // whether the handle has been added to the multi object bool headersProcessed = false; @@ -84,7 +84,7 @@ struct curlFileTransfer : public FileTransfer TransferItem(curlFileTransfer & fileTransfer, const FileTransferRequest & request, Callback && callback, - std::function dataCallback) + std::function dataCallback) : fileTransfer(fileTransfer) , request(request) , act(*logger, lvlTalkative, actFileTransfer, @@ -92,16 +92,6 @@ struct curlFileTransfer : public FileTransfer {request.uri}, request.parentAct) , callback(std::move(callback)) , dataCallback(std::move(dataCallback)) - , finalSink([this](std::string_view data) { - auto httpStatus = getHTTPStatus(); - /* Only write data to the sink if this is a - successful response. */ - if (successfulStatuses.count(httpStatus) && this->dataCallback) { - writtenToSink += data.size(); - this->dataCallback(data); - } else - this->result.data.append(data); - }) { requestHeaders = curl_slist_append(requestHeaders, "Accept-Encoding: zstd, br, gzip, deflate, bzip2, xz"); if (!request.expectedETag.empty()) @@ -142,9 +132,6 @@ struct curlFileTransfer : public FileTransfer failEx(std::make_exception_ptr(std::forward(e))); } - LambdaSink finalSink; - std::shared_ptr decompressionSink; - std::exception_ptr writeException; std::optional getHeader(const char * name) @@ -177,12 +164,13 @@ struct curlFileTransfer : public FileTransfer size_t realSize = size * nmemb; result.bodySize += realSize; - if (!decompressionSink) { - decompressionSink = makeDecompressionSink(encoding, finalSink); + if (successfulStatuses.count(getHTTPStatus()) && this->dataCallback) { + writtenToSink += realSize; + dataCallback(*this, {(const char *) contents, realSize}); + } else { + this->result.data.append((const char *) contents, realSize); } - (*decompressionSink)({(char *) contents, realSize}); - return realSize; } catch (...) { writeException = std::current_exception(); @@ -345,14 +333,6 @@ struct curlFileTransfer : public FileTransfer debug("finished %s of '%s'; curl status = %d, HTTP status = %d, body = %d bytes", request.verb(), request.uri, code, httpStatus, result.bodySize); - if (decompressionSink) { - try { - decompressionSink->finish(); - } catch (...) { - writeException = std::current_exception(); - } - } - auto link = getHeader("link"); if (!link) { link = getHeader("x-amz-meta-link"); @@ -372,6 +352,14 @@ struct curlFileTransfer : public FileTransfer result.etag = std::move(*etag); } + // this has to happen here until we can return an actual future. + // wrapping user `callback`s instead is not possible because the + // Callback api expects std::functions, and copying Callbacks is + // not possible due the promises they hold. + if (code == CURLE_OK && !dataCallback) { + result.data = decompress(encoding, result.data); + } + if (writeException) failEx(writeException); @@ -674,7 +662,7 @@ struct curlFileTransfer : public FileTransfer void enqueueFileTransfer(const FileTransferRequest & request, Callback callback, - std::function dataCallback) + std::function dataCallback) { /* Ugly hack to support s3:// URIs. */ if (request.uri.starts_with("s3://")) { @@ -724,6 +712,7 @@ struct curlFileTransfer : public FileTransfer std::exception_ptr exc; std::string data; std::condition_variable avail, request; + std::unique_ptr decompressor; }; auto _state = std::make_shared>(); @@ -748,11 +737,15 @@ struct curlFileTransfer : public FileTransfer state->avail.notify_one(); state->request.notify_one(); }}, - [_state](std::string_view data) { + [_state, &sink](TransferItem & transfer, std::string_view data) { auto state(_state->lock()); if (state->quit) return; + if (!state->decompressor) { + state->decompressor = makeDecompressionSink(transfer.encoding, sink); + } + /* If the buffer is full, then go to sleep until the calling thread wakes us up (i.e. when it has removed data from the buffer). We don't wait forever to prevent stalling the @@ -773,6 +766,7 @@ struct curlFileTransfer : public FileTransfer checkInterrupt(); std::string chunk; + FinishSink * sink = nullptr; /* Grab data if available, otherwise wait for the download thread to wake us up. */ @@ -783,6 +777,9 @@ struct curlFileTransfer : public FileTransfer if (state->quit) { if (state->exc) std::rethrow_exception(state->exc); + if (state->decompressor) { + state->decompressor->finish(); + } return; } @@ -792,6 +789,7 @@ struct curlFileTransfer : public FileTransfer } chunk = std::move(state->data); + sink = state->decompressor.get(); /* Reset state->data after the move, since we check data.empty() */ state->data = ""; @@ -802,7 +800,7 @@ struct curlFileTransfer : public FileTransfer if it's blocked on a full buffer. We don't hold the state lock while doing this to prevent blocking the download thread if sink() takes a long time. */ - sink(chunk); + (*sink)(chunk); } } };