From d75c399b2916727e25db651c76fa6b4bb89e21d8 Mon Sep 17 00:00:00 2001 From: eldritch horrors Date: Sat, 9 Nov 2024 01:17:28 +0100 Subject: [PATCH] libstore: delocalize TransferSource future changes will need to add template functions to this class, which cannot be done for classes declared locally in unctions. otherwise this is mostly code motion to clean up enqueueFileTransfer a little further. Change-Id: If9a9d9eb47ceadfa75a4eebd54e2db39f2305643 --- src/libstore/filetransfer.cc | 219 +++++++++++++++++++---------------- 1 file changed, 118 insertions(+), 101 deletions(-) diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc index 2fc28e345..f3ca107aa 100644 --- a/src/libstore/filetransfer.cc +++ b/src/libstore/filetransfer.cc @@ -847,6 +847,14 @@ struct curlFileTransfer : public FileTransfer return std::move(*eager); } + auto source = make_box_ptr(*this, uri, headers, std::move(data), noBody); + auto metadata = source->startTransfer(); + source->awaitData(); + return {std::move(metadata), std::move(source)}; + } + + struct TransferSource : Source + { struct State { bool done = false; std::exception_ptr exc; @@ -854,126 +862,135 @@ struct curlFileTransfer : public FileTransfer std::condition_variable avail; }; - auto _state = std::make_shared>(); + curlFileTransfer & parent; + std::string uri; + Headers headers; + std::optional data; + bool noBody; - auto item = std::make_shared( - *this, - uri, - headers, - getCurActivity(), - [_state](std::exception_ptr ex) { - auto state(_state->lock()); - state->done = true; - state->exc = ex; - state->avail.notify_one(); - }, - [_state](std::string_view data) { - auto state(_state->lock()); + const std::shared_ptr> _state = std::make_shared>(); + std::shared_ptr transfer; + std::string chunk; + std::string_view buffered; - /* If the buffer is full, then go to sleep until the calling - thread wakes us up (i.e. when it has removed data from the - buffer). We don't wait forever to prevent stalling the - download thread. (Hopefully sleeping will throttle the - sender.) */ - if (state->data.size() > 1024 * 1024) { - return false; - } - - /* Append data to the buffer and wake up the calling - thread. */ - state->data.append(data); - state->avail.notify_one(); - return true; - }, - std::move(data), - noBody - ); - enqueueItem(item); - - struct TransferSource : Source + TransferSource( + curlFileTransfer & parent, + const std::string & uri, + const Headers & headers, + std::optional data, + bool noBody + ) + : parent(parent) + , uri(uri) + , headers(headers) + , data(std::move(data)) + , noBody(noBody) { - const std::shared_ptr> _state; - std::shared_ptr transfer; - std::string chunk; - std::string_view buffered; + } - explicit TransferSource( - const std::shared_ptr> & state, std::shared_ptr transfer - ) - : _state(state) - , transfer(std::move(transfer)) - { - } - - ~TransferSource() - { - // wake up the download thread if it's still going and have it abort - try { + ~TransferSource() + { + // wake up the download thread if it's still going and have it abort + try { + if (transfer) { transfer->cancel(); - } catch (...) { - ignoreExceptionInDestructor(); } + } catch (...) { + ignoreExceptionInDestructor(); } + } - void awaitData() - { - /* Grab data if available, otherwise wait for the download - thread to wake us up. */ - while (buffered.empty()) { + FileTransferResult startTransfer() + { + transfer = std::make_shared( + parent, + uri, + headers, + getCurActivity(), + [_state{_state}](std::exception_ptr ex) { + auto state(_state->lock()); + state->done = true; + state->exc = ex; + state->avail.notify_one(); + }, + [_state{_state}](std::string_view data) { auto state(_state->lock()); - if (state->data.empty()) { - if (state->done) { - if (state->exc) { - std::rethrow_exception(state->exc); - } - return; + /* If the buffer is full, then go to sleep until the calling + thread wakes us up (i.e. when it has removed data from the + buffer). We don't wait forever to prevent stalling the + download thread. (Hopefully sleeping will throttle the + sender.) */ + if (state->data.size() > 1024 * 1024) { + return false; + } + + /* Append data to the buffer and wake up the calling + thread. */ + state->data.append(data); + state->avail.notify_one(); + return true; + }, + std::move(data), + noBody + ); + parent.enqueueItem(transfer); + return transfer->metadataPromise.get_future().get(); + } + + void awaitData() + { + /* Grab data if available, otherwise wait for the download + thread to wake us up. */ + while (buffered.empty()) { + auto state(_state->lock()); + + if (state->data.empty()) { + if (state->done) { + if (state->exc) { + std::rethrow_exception(state->exc); } - - transfer->unpause(); - state.wait(state->avail); + return; } - chunk = std::move(state->data); - buffered = chunk; transfer->unpause(); + state.wait(state->avail); + } + + chunk = std::move(state->data); + buffered = chunk; + transfer->unpause(); + } + } + + size_t read(char * data, size_t len) override + { + auto readPartial = [this](char * data, size_t len) -> size_t { + const auto available = std::min(len, buffered.size()); + if (available == 0u) return 0u; + + memcpy(data, buffered.data(), available); + buffered.remove_prefix(available); + return available; + }; + size_t total = readPartial(data, len); + + while (total < len) { + awaitData(); + const auto current = readPartial(data + total, len - total); + total += current; + if (total == 0 || current == 0) { + break; } } - size_t read(char * data, size_t len) override - { - auto readPartial = [this](char * data, size_t len) -> size_t { - const auto available = std::min(len, buffered.size()); - if (available == 0u) return 0u; - - memcpy(data, buffered.data(), available); - buffered.remove_prefix(available); - return available; - }; - size_t total = readPartial(data, len); - - while (total < len) { - awaitData(); - const auto current = readPartial(data + total, len - total); - total += current; - if (total == 0 || current == 0) { - break; - } - } - - if (total == 0) { - throw EndOfFile("transfer finished"); - } - - return total; + if (total == 0) { + throw EndOfFile("transfer finished"); } - }; - auto metadata = item->metadataPromise.get_future().get(); - auto source = make_box_ptr(_state, item); - source->awaitData(); - return {std::move(metadata), std::move(source)}; - } + return total; + } + }; bool exists(const std::string & uri, const Headers & headers) override {