diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc index 2fc28e345..f3ca107aa 100644 --- a/src/libstore/filetransfer.cc +++ b/src/libstore/filetransfer.cc @@ -847,6 +847,14 @@ struct curlFileTransfer : public FileTransfer return std::move(*eager); } + auto source = make_box_ptr(*this, uri, headers, std::move(data), noBody); + auto metadata = source->startTransfer(); + source->awaitData(); + return {std::move(metadata), std::move(source)}; + } + + struct TransferSource : Source + { struct State { bool done = false; std::exception_ptr exc; @@ -854,126 +862,135 @@ struct curlFileTransfer : public FileTransfer std::condition_variable avail; }; - auto _state = std::make_shared>(); + curlFileTransfer & parent; + std::string uri; + Headers headers; + std::optional data; + bool noBody; - auto item = std::make_shared( - *this, - uri, - headers, - getCurActivity(), - [_state](std::exception_ptr ex) { - auto state(_state->lock()); - state->done = true; - state->exc = ex; - state->avail.notify_one(); - }, - [_state](std::string_view data) { - auto state(_state->lock()); + const std::shared_ptr> _state = std::make_shared>(); + std::shared_ptr transfer; + std::string chunk; + std::string_view buffered; - /* If the buffer is full, then go to sleep until the calling - thread wakes us up (i.e. when it has removed data from the - buffer). We don't wait forever to prevent stalling the - download thread. (Hopefully sleeping will throttle the - sender.) */ - if (state->data.size() > 1024 * 1024) { - return false; - } - - /* Append data to the buffer and wake up the calling - thread. */ - state->data.append(data); - state->avail.notify_one(); - return true; - }, - std::move(data), - noBody - ); - enqueueItem(item); - - struct TransferSource : Source + TransferSource( + curlFileTransfer & parent, + const std::string & uri, + const Headers & headers, + std::optional data, + bool noBody + ) + : parent(parent) + , uri(uri) + , headers(headers) + , data(std::move(data)) + , noBody(noBody) { - const std::shared_ptr> _state; - std::shared_ptr transfer; - std::string chunk; - std::string_view buffered; + } - explicit TransferSource( - const std::shared_ptr> & state, std::shared_ptr transfer - ) - : _state(state) - , transfer(std::move(transfer)) - { - } - - ~TransferSource() - { - // wake up the download thread if it's still going and have it abort - try { + ~TransferSource() + { + // wake up the download thread if it's still going and have it abort + try { + if (transfer) { transfer->cancel(); - } catch (...) { - ignoreExceptionInDestructor(); } + } catch (...) { + ignoreExceptionInDestructor(); } + } - void awaitData() - { - /* Grab data if available, otherwise wait for the download - thread to wake us up. */ - while (buffered.empty()) { + FileTransferResult startTransfer() + { + transfer = std::make_shared( + parent, + uri, + headers, + getCurActivity(), + [_state{_state}](std::exception_ptr ex) { + auto state(_state->lock()); + state->done = true; + state->exc = ex; + state->avail.notify_one(); + }, + [_state{_state}](std::string_view data) { auto state(_state->lock()); - if (state->data.empty()) { - if (state->done) { - if (state->exc) { - std::rethrow_exception(state->exc); - } - return; + /* If the buffer is full, then go to sleep until the calling + thread wakes us up (i.e. when it has removed data from the + buffer). We don't wait forever to prevent stalling the + download thread. (Hopefully sleeping will throttle the + sender.) */ + if (state->data.size() > 1024 * 1024) { + return false; + } + + /* Append data to the buffer and wake up the calling + thread. */ + state->data.append(data); + state->avail.notify_one(); + return true; + }, + std::move(data), + noBody + ); + parent.enqueueItem(transfer); + return transfer->metadataPromise.get_future().get(); + } + + void awaitData() + { + /* Grab data if available, otherwise wait for the download + thread to wake us up. */ + while (buffered.empty()) { + auto state(_state->lock()); + + if (state->data.empty()) { + if (state->done) { + if (state->exc) { + std::rethrow_exception(state->exc); } - - transfer->unpause(); - state.wait(state->avail); + return; } - chunk = std::move(state->data); - buffered = chunk; transfer->unpause(); + state.wait(state->avail); + } + + chunk = std::move(state->data); + buffered = chunk; + transfer->unpause(); + } + } + + size_t read(char * data, size_t len) override + { + auto readPartial = [this](char * data, size_t len) -> size_t { + const auto available = std::min(len, buffered.size()); + if (available == 0u) return 0u; + + memcpy(data, buffered.data(), available); + buffered.remove_prefix(available); + return available; + }; + size_t total = readPartial(data, len); + + while (total < len) { + awaitData(); + const auto current = readPartial(data + total, len - total); + total += current; + if (total == 0 || current == 0) { + break; } } - size_t read(char * data, size_t len) override - { - auto readPartial = [this](char * data, size_t len) -> size_t { - const auto available = std::min(len, buffered.size()); - if (available == 0u) return 0u; - - memcpy(data, buffered.data(), available); - buffered.remove_prefix(available); - return available; - }; - size_t total = readPartial(data, len); - - while (total < len) { - awaitData(); - const auto current = readPartial(data + total, len - total); - total += current; - if (total == 0 || current == 0) { - break; - } - } - - if (total == 0) { - throw EndOfFile("transfer finished"); - } - - return total; + if (total == 0) { + throw EndOfFile("transfer finished"); } - }; - auto metadata = item->metadataPromise.get_future().get(); - auto source = make_box_ptr(_state, item); - source->awaitData(); - return {std::move(metadata), std::move(source)}; - } + return total; + } + }; bool exists(const std::string & uri, const Headers & headers) override {