From dfe3baea12d759928b992dec0452f6636d42f990 Mon Sep 17 00:00:00 2001 From: eldritch horrors Date: Thu, 25 Apr 2024 16:04:49 +0200 Subject: [PATCH] filetransfer: make two-arg download abstract this lets each implementation of FileTransfer (of which currently only the one exists at all) implement appropriate handling for its internal behaviours that are not otherwise exposed. in curl this lets us switch the buffer-full handling method from "block the entire curl thread" to "pause just the one transfer", move the non-libcurl body decompression out of the actual curl wrapper (which will let us eventually morph the curl wrapper intto an actual source of Sources), and some other things Change-Id: Id6d3593cde6b4915aab3e90a43b175c103cc3f18 --- src/libstore/filetransfer.cc | 198 +++++++++++++++++------------------ src/libstore/filetransfer.hh | 2 +- 2 files changed, 100 insertions(+), 100 deletions(-) diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc index 9dc742220..b4887d9e8 100644 --- a/src/libstore/filetransfer.cc +++ b/src/libstore/filetransfer.cc @@ -696,6 +696,105 @@ struct curlFileTransfer : public FileTransfer enqueueItem(std::make_shared(*this, request, std::move(callback))); } + + void download(FileTransferRequest && request, Sink & sink) override + { + /* Note: we can't call 'sink' via request.dataCallback, because + that would cause the sink to execute on the fileTransfer + thread. If 'sink' is a coroutine, this will fail. Also, if the + sink is expensive (e.g. one that does decompression and writing + to the Nix store), it would stall the download thread too much. + Therefore we use a buffer to communicate data between the + download thread and the calling thread. */ + + struct State { + bool quit = false; + std::exception_ptr exc; + std::string data; + std::condition_variable avail, request; + }; + + auto _state = std::make_shared>(); + + /* In case of an exception, wake up the download thread. FIXME: + abort the download request. */ + Finally finally([&]() { + auto state(_state->lock()); + state->quit = true; + state->request.notify_one(); + }); + + request.dataCallback = [_state](std::string_view data) { + + auto state(_state->lock()); + + if (state->quit) return; + + /* If the buffer is full, then go to sleep until the calling + thread wakes us up (i.e. when it has removed data from the + buffer). We don't wait forever to prevent stalling the + download thread. (Hopefully sleeping will throttle the + sender.) */ + if (state->data.size() > 1024 * 1024) { + debug("download buffer is full; going to sleep"); + state.wait_for(state->request, std::chrono::seconds(10)); + } + + /* Append data to the buffer and wake up the calling + thread. */ + state->data.append(data); + state->avail.notify_one(); + }; + + enqueueFileTransfer(request, + {[_state](std::future fut) { + auto state(_state->lock()); + state->quit = true; + try { + fut.get(); + } catch (...) { + state->exc = std::current_exception(); + } + state->avail.notify_one(); + state->request.notify_one(); + }}); + + while (true) { + checkInterrupt(); + + std::string chunk; + + /* Grab data if available, otherwise wait for the download + thread to wake us up. */ + { + auto state(_state->lock()); + + if (state->data.empty()) { + + if (state->quit) { + if (state->exc) std::rethrow_exception(state->exc); + return; + } + + state.wait(state->avail); + + if (state->data.empty()) continue; + } + + chunk = std::move(state->data); + /* Reset state->data after the move, since we check data.empty() */ + state->data = ""; + + state->request.notify_one(); + } + + /* Flush the data to the sink and wake up the download thread + if it's blocked on a full buffer. We don't hold the state + lock while doing this to prevent blocking the download + thread if sink() takes a long time. */ + sink(chunk); + } + } }; ref makeCurlFileTransfer() @@ -743,105 +842,6 @@ FileTransferResult FileTransfer::upload(const FileTransferRequest & request) return enqueueFileTransfer(request).get(); } -void FileTransfer::download(FileTransferRequest && request, Sink & sink) -{ - /* Note: we can't call 'sink' via request.dataCallback, because - that would cause the sink to execute on the fileTransfer - thread. If 'sink' is a coroutine, this will fail. Also, if the - sink is expensive (e.g. one that does decompression and writing - to the Nix store), it would stall the download thread too much. - Therefore we use a buffer to communicate data between the - download thread and the calling thread. */ - - struct State { - bool quit = false; - std::exception_ptr exc; - std::string data; - std::condition_variable avail, request; - }; - - auto _state = std::make_shared>(); - - /* In case of an exception, wake up the download thread. FIXME: - abort the download request. */ - Finally finally([&]() { - auto state(_state->lock()); - state->quit = true; - state->request.notify_one(); - }); - - request.dataCallback = [_state](std::string_view data) { - - auto state(_state->lock()); - - if (state->quit) return; - - /* If the buffer is full, then go to sleep until the calling - thread wakes us up (i.e. when it has removed data from the - buffer). We don't wait forever to prevent stalling the - download thread. (Hopefully sleeping will throttle the - sender.) */ - if (state->data.size() > 1024 * 1024) { - debug("download buffer is full; going to sleep"); - state.wait_for(state->request, std::chrono::seconds(10)); - } - - /* Append data to the buffer and wake up the calling - thread. */ - state->data.append(data); - state->avail.notify_one(); - }; - - enqueueFileTransfer(request, - {[_state](std::future fut) { - auto state(_state->lock()); - state->quit = true; - try { - fut.get(); - } catch (...) { - state->exc = std::current_exception(); - } - state->avail.notify_one(); - state->request.notify_one(); - }}); - - while (true) { - checkInterrupt(); - - std::string chunk; - - /* Grab data if available, otherwise wait for the download - thread to wake us up. */ - { - auto state(_state->lock()); - - if (state->data.empty()) { - - if (state->quit) { - if (state->exc) std::rethrow_exception(state->exc); - return; - } - - state.wait(state->avail); - - if (state->data.empty()) continue; - } - - chunk = std::move(state->data); - /* Reset state->data after the move, since we check data.empty() */ - state->data = ""; - - state->request.notify_one(); - } - - /* Flush the data to the sink and wake up the download thread - if it's blocked on a full buffer. We don't hold the state - lock while doing this to prevent blocking the download - thread if sink() takes a long time. */ - sink(chunk); - } -} - template FileTransferError::FileTransferError(FileTransfer::Error error, std::optional response, const Args & ... args) : Error(args...), error(error), response(response) diff --git a/src/libstore/filetransfer.hh b/src/libstore/filetransfer.hh index 6c11c14ee..c724005d7 100644 --- a/src/libstore/filetransfer.hh +++ b/src/libstore/filetransfer.hh @@ -115,7 +115,7 @@ struct FileTransfer * Download a file, writing its data to a sink. The sink will be * invoked on the thread of the caller. */ - void download(FileTransferRequest && request, Sink & sink); + virtual void download(FileTransferRequest && request, Sink & sink) = 0; enum Error { NotFound, Forbidden, Misc, Transient, Interrupted }; };