diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc index 9dc742220..b4887d9e8 100644 --- a/src/libstore/filetransfer.cc +++ b/src/libstore/filetransfer.cc @@ -696,6 +696,105 @@ struct curlFileTransfer : public FileTransfer enqueueItem(std::make_shared(*this, request, std::move(callback))); } + + void download(FileTransferRequest && request, Sink & sink) override + { + /* Note: we can't call 'sink' via request.dataCallback, because + that would cause the sink to execute on the fileTransfer + thread. If 'sink' is a coroutine, this will fail. Also, if the + sink is expensive (e.g. one that does decompression and writing + to the Nix store), it would stall the download thread too much. + Therefore we use a buffer to communicate data between the + download thread and the calling thread. */ + + struct State { + bool quit = false; + std::exception_ptr exc; + std::string data; + std::condition_variable avail, request; + }; + + auto _state = std::make_shared>(); + + /* In case of an exception, wake up the download thread. FIXME: + abort the download request. */ + Finally finally([&]() { + auto state(_state->lock()); + state->quit = true; + state->request.notify_one(); + }); + + request.dataCallback = [_state](std::string_view data) { + + auto state(_state->lock()); + + if (state->quit) return; + + /* If the buffer is full, then go to sleep until the calling + thread wakes us up (i.e. when it has removed data from the + buffer). We don't wait forever to prevent stalling the + download thread. (Hopefully sleeping will throttle the + sender.) */ + if (state->data.size() > 1024 * 1024) { + debug("download buffer is full; going to sleep"); + state.wait_for(state->request, std::chrono::seconds(10)); + } + + /* Append data to the buffer and wake up the calling + thread. */ + state->data.append(data); + state->avail.notify_one(); + }; + + enqueueFileTransfer(request, + {[_state](std::future fut) { + auto state(_state->lock()); + state->quit = true; + try { + fut.get(); + } catch (...) { + state->exc = std::current_exception(); + } + state->avail.notify_one(); + state->request.notify_one(); + }}); + + while (true) { + checkInterrupt(); + + std::string chunk; + + /* Grab data if available, otherwise wait for the download + thread to wake us up. */ + { + auto state(_state->lock()); + + if (state->data.empty()) { + + if (state->quit) { + if (state->exc) std::rethrow_exception(state->exc); + return; + } + + state.wait(state->avail); + + if (state->data.empty()) continue; + } + + chunk = std::move(state->data); + /* Reset state->data after the move, since we check data.empty() */ + state->data = ""; + + state->request.notify_one(); + } + + /* Flush the data to the sink and wake up the download thread + if it's blocked on a full buffer. We don't hold the state + lock while doing this to prevent blocking the download + thread if sink() takes a long time. */ + sink(chunk); + } + } }; ref makeCurlFileTransfer() @@ -743,105 +842,6 @@ FileTransferResult FileTransfer::upload(const FileTransferRequest & request) return enqueueFileTransfer(request).get(); } -void FileTransfer::download(FileTransferRequest && request, Sink & sink) -{ - /* Note: we can't call 'sink' via request.dataCallback, because - that would cause the sink to execute on the fileTransfer - thread. If 'sink' is a coroutine, this will fail. Also, if the - sink is expensive (e.g. one that does decompression and writing - to the Nix store), it would stall the download thread too much. - Therefore we use a buffer to communicate data between the - download thread and the calling thread. */ - - struct State { - bool quit = false; - std::exception_ptr exc; - std::string data; - std::condition_variable avail, request; - }; - - auto _state = std::make_shared>(); - - /* In case of an exception, wake up the download thread. FIXME: - abort the download request. */ - Finally finally([&]() { - auto state(_state->lock()); - state->quit = true; - state->request.notify_one(); - }); - - request.dataCallback = [_state](std::string_view data) { - - auto state(_state->lock()); - - if (state->quit) return; - - /* If the buffer is full, then go to sleep until the calling - thread wakes us up (i.e. when it has removed data from the - buffer). We don't wait forever to prevent stalling the - download thread. (Hopefully sleeping will throttle the - sender.) */ - if (state->data.size() > 1024 * 1024) { - debug("download buffer is full; going to sleep"); - state.wait_for(state->request, std::chrono::seconds(10)); - } - - /* Append data to the buffer and wake up the calling - thread. */ - state->data.append(data); - state->avail.notify_one(); - }; - - enqueueFileTransfer(request, - {[_state](std::future fut) { - auto state(_state->lock()); - state->quit = true; - try { - fut.get(); - } catch (...) { - state->exc = std::current_exception(); - } - state->avail.notify_one(); - state->request.notify_one(); - }}); - - while (true) { - checkInterrupt(); - - std::string chunk; - - /* Grab data if available, otherwise wait for the download - thread to wake us up. */ - { - auto state(_state->lock()); - - if (state->data.empty()) { - - if (state->quit) { - if (state->exc) std::rethrow_exception(state->exc); - return; - } - - state.wait(state->avail); - - if (state->data.empty()) continue; - } - - chunk = std::move(state->data); - /* Reset state->data after the move, since we check data.empty() */ - state->data = ""; - - state->request.notify_one(); - } - - /* Flush the data to the sink and wake up the download thread - if it's blocked on a full buffer. We don't hold the state - lock while doing this to prevent blocking the download - thread if sink() takes a long time. */ - sink(chunk); - } -} - template FileTransferError::FileTransferError(FileTransfer::Error error, std::optional response, const Args & ... args) : Error(args...), error(error), response(response) diff --git a/src/libstore/filetransfer.hh b/src/libstore/filetransfer.hh index 6c11c14ee..c724005d7 100644 --- a/src/libstore/filetransfer.hh +++ b/src/libstore/filetransfer.hh @@ -115,7 +115,7 @@ struct FileTransfer * Download a file, writing its data to a sink. The sink will be * invoked on the thread of the caller. */ - void download(FileTransferRequest && request, Sink & sink); + virtual void download(FileTransferRequest && request, Sink & sink) = 0; enum Error { NotFound, Forbidden, Misc, Transient, Interrupted }; };