From 001a078b96cb45473855e757e1a412b2faac6022 Mon Sep 17 00:00:00 2001 From: eldritch horrors Date: Sat, 9 Nov 2024 01:17:28 +0100 Subject: [PATCH] libstore: move file transfer retry handling out of TransferItem this simplifies the immediate curl wrappers significantly and clarifies control flow for retries. we can almost see the promise land from here! Change-Id: Idc66b744631eec9c1ad5a2be388beb942a04f1f9 --- src/libstore/filetransfer.cc | 215 ++++++++++++++++++----------------- 1 file changed, 108 insertions(+), 107 deletions(-) diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc index f3ca107aa..ab68cd3fd 100644 --- a/src/libstore/filetransfer.cc +++ b/src/libstore/filetransfer.cc @@ -21,7 +21,6 @@ #include #include #include -#include #include #include #include @@ -38,8 +37,6 @@ struct curlFileTransfer : public FileTransfer { CURLM * curlm = 0; - std::random_device rd; - std::mt19937 mt19937; const unsigned int baseRetryTimeMs; struct TransferItem : public std::enable_shared_from_this @@ -48,15 +45,12 @@ struct curlFileTransfer : public FileTransfer std::string uri; FileTransferResult result; Activity act; - std::optional uploadData; + std::optional uploadData; std::string downloadData; bool noBody = false; // \equiv HTTP HEAD, don't download data enum { /// nothing has been transferred yet initialSetup, - /// at least some metadata has already been transferred, - /// but the transfer did not succeed and is now retrying - retrySetup, /// data transfer in progress transferring, /// transfer complete, result or failure reported @@ -69,14 +63,8 @@ struct curlFileTransfer : public FileTransfer CURL * req; // must never be nullptr std::string statusMsg; - unsigned int attempt = 0; - const size_t tries = fileTransferSettings.tries; uint64_t bodySize = 0; - /* Don't start this download until the specified time point - has been reached. */ - std::chrono::steady_clock::time_point embargo; - struct curl_slist * requestHeaders = 0; curl_off_t writtenToSink = 0; @@ -104,8 +92,9 @@ struct curlFileTransfer : public FileTransfer ActivityId parentAct, std::invocable auto doneCallback, std::function dataCallback, - std::optional uploadData, - bool noBody + std::optional uploadData, + bool noBody, + curl_off_t writtenToSink ) : fileTransfer(fileTransfer) , uri(uri) @@ -119,6 +108,7 @@ struct curlFileTransfer : public FileTransfer }) , dataCallback(std::move(dataCallback)) , req(curl_easy_init()) + , writtenToSink(writtenToSink) { if (req == nullptr) { throw FileTransferError(Misc, {}, "could not allocate curl handle"); @@ -168,26 +158,15 @@ struct curlFileTransfer : public FileTransfer failEx(std::make_exception_ptr(std::forward(e))); } - [[noreturn]] - void throwChangedTarget(std::string_view what, std::string_view from, std::string_view to) - { - throw FileTransferError( - Misc, {}, "uri %s changed %s from %s to %s during transfer", uri, what, from, to - ); - } - void maybeFinishSetup() { - if (phase > retrySetup) { + if (phase > initialSetup) { return; } char * effectiveUriCStr = nullptr; curl_easy_getinfo(req, CURLINFO_EFFECTIVE_URL, &effectiveUriCStr); if (effectiveUriCStr) { - if (!result.effectiveUri.empty() && result.effectiveUri != effectiveUriCStr) { - throwChangedTarget("final destination", result.effectiveUri, effectiveUriCStr); - } result.effectiveUri = effectiveUriCStr; } @@ -250,9 +229,6 @@ struct curlFileTransfer : public FileTransfer // NOTE we don't check that the etag hasn't gone *missing*. technically // this is not an error as long as we get the same data from the remote. auto etag = trim(line.substr(i + 1)); - if (!result.etag.empty() && result.etag != etag) { - throwChangedTarget("ETag", result.etag, etag); - } result.etag = std::move(etag); } @@ -260,9 +236,6 @@ struct curlFileTransfer : public FileTransfer auto value = trim(line.substr(i + 1)); static std::regex linkRegex("<([^>]*)>; rel=\"immutable\"", std::regex::extended | std::regex::icase); if (std::smatch match; std::regex_match(value, match, linkRegex)) { - if (result.immutableUrl && result.immutableUrl != match.str(1)) { - throwChangedTarget("immutable url", *result.immutableUrl, match.str(1)); - } result.immutableUrl = match.str(1); } else debug("got invalid link header '%s'", value); @@ -324,10 +297,6 @@ struct curlFileTransfer : public FileTransfer void init() { - if (phase > initialSetup) { - phase = retrySetup; - } - curl_easy_reset(req); if (verbosity >= lvlVomit) { @@ -335,10 +304,6 @@ struct curlFileTransfer : public FileTransfer curl_easy_setopt(req, CURLOPT_DEBUGFUNCTION, TransferItem::debugCallback); } - // use the effective URI of the previous transfer for retries. this avoids - // some silent corruption if a redirect changes between starting and retry. - const auto & uri = result.effectiveUri.empty() ? this->uri : result.effectiveUri; - curl_easy_setopt(req, CURLOPT_URL, uri.c_str()); curl_easy_setopt(req, CURLOPT_FOLLOWLOCATION, 1L); curl_easy_setopt(req, CURLOPT_ACCEPT_ENCODING, ""); // all of them! @@ -467,8 +432,6 @@ struct curlFileTransfer : public FileTransfer #pragma GCC diagnostic pop } - attempt++; - std::optional response; if (!successfulStatuses.count(httpStatus)) response = std::move(downloadData); @@ -486,27 +449,7 @@ struct curlFileTransfer : public FileTransfer "unable to %s '%s': %s (%d)", verb(), uri, curl_easy_strerror(code), code); - /* If this is a transient error, then maybe retry the - download after a while. If we're writing to a - sink, we can only retry if the server supports - ranged requests. */ - if (err == Transient - && !uploadData.has_value() - && attempt < tries - && (!this->dataCallback - || writtenToSink == 0 - || acceptsRanges())) - { - int ms = fileTransfer.baseRetryTimeMs * std::pow(2.0f, attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(fileTransfer.mt19937)); - if (writtenToSink) - warn("%s; retrying from offset %d in %d ms", exc.what(), writtenToSink, ms); - else - warn("%s; retrying in %d ms", exc.what(), ms); - embargo = std::chrono::steady_clock::now() + std::chrono::milliseconds(ms); - fileTransfer.enqueueItem(shared_from_this()); - } - else - fail(std::move(exc)); + fail(std::move(exc)); } } @@ -535,13 +478,8 @@ struct curlFileTransfer : public FileTransfer struct State { - struct EmbargoComparator { - bool operator() (const std::shared_ptr & i1, const std::shared_ptr & i2) { - return i1->embargo > i2->embargo; - } - }; bool quit = false; - std::priority_queue, std::vector>, EmbargoComparator> incoming; + std::vector> incoming; std::vector> unpause; std::map, std::promise> cancel; }; @@ -551,8 +489,7 @@ struct curlFileTransfer : public FileTransfer std::thread workerThread; curlFileTransfer(unsigned int baseRetryTimeMs) - : mt19937(rd()) - , baseRetryTimeMs(baseRetryTimeMs) + : baseRetryTimeMs(baseRetryTimeMs) { static std::once_flag globalInit; std::call_once(globalInit, curl_global_init, CURL_GLOBAL_ALL); @@ -657,7 +594,6 @@ struct curlFileTransfer : public FileTransfer retry timeout to expire). */ std::vector> incoming; - auto now = std::chrono::steady_clock::now(); timeoutMs = INT64_MAX; @@ -670,18 +606,7 @@ struct curlFileTransfer : public FileTransfer { auto state(state_.lock()); - while (!state->incoming.empty()) { - auto item = state->incoming.top(); - if (item->embargo <= now) { - incoming.push_back(item); - state->incoming.pop(); - } else { - using namespace std::chrono; - auto wait = duration_cast(item->embargo - now); - timeoutMs = std::min(timeoutMs, wait.count()); - break; - } - } + incoming = std::move(state->incoming); quit = state->quit; } @@ -707,7 +632,7 @@ struct curlFileTransfer : public FileTransfer { auto state(state_.lock()); - while (!state->incoming.empty()) state->incoming.pop(); + state->incoming.clear(); state->quit = true; } } @@ -723,7 +648,7 @@ struct curlFileTransfer : public FileTransfer auto state(state_.lock()); if (state->quit) throw nix::Error("cannot enqueue download request because the download thread is shutting down"); - state->incoming.push(item); + state->incoming.push_back(item); } wakeup(); } @@ -848,9 +773,8 @@ struct curlFileTransfer : public FileTransfer } auto source = make_box_ptr(*this, uri, headers, std::move(data), noBody); - auto metadata = source->startTransfer(); source->awaitData(); - return {std::move(metadata), std::move(source)}; + return {source->metadata, std::move(source)}; } struct TransferSource : Source @@ -867,12 +791,17 @@ struct curlFileTransfer : public FileTransfer Headers headers; std::optional data; bool noBody; + ActivityId parentAct = getCurActivity(); const std::shared_ptr> _state = std::make_shared>(); std::shared_ptr transfer; + FileTransferResult metadata; std::string chunk; std::string_view buffered; + unsigned int attempt = 0; + const size_t tries = fileTransferSettings.tries; + TransferSource( curlFileTransfer & parent, const std::string & uri, @@ -886,6 +815,7 @@ struct curlFileTransfer : public FileTransfer , data(std::move(data)) , noBody(noBody) { + metadata = withRetries([&] { return startTransfer(uri); }); } ~TransferSource() @@ -900,16 +830,30 @@ struct curlFileTransfer : public FileTransfer } } - FileTransferResult startTransfer() + auto withRetries(auto fn) -> decltype(fn()) { + while (true) { + try { + return fn(); + } catch (FileTransferError & e) { + if (!attemptRetry(e)) { + throw; + } + } + } + } + + FileTransferResult startTransfer(const std::string & uri, curl_off_t offset = 0) + { + attempt += 1; transfer = std::make_shared( parent, uri, headers, - getCurActivity(), + parentAct, [_state{_state}](std::exception_ptr ex) { auto state(_state->lock()); - state->done = true; + state->done = ex == nullptr; state->exc = ex; state->avail.notify_one(); }, @@ -931,36 +875,93 @@ struct curlFileTransfer : public FileTransfer state->avail.notify_one(); return true; }, - std::move(data), - noBody + data ? std::optional(std::string_view(*data)) : std::nullopt, + noBody, + offset ); parent.enqueueItem(transfer); return transfer->metadataPromise.get_future().get(); } + void throwChangedTarget(std::string_view what, std::string_view from, std::string_view to) + { + if (!from.empty() && from != to) { + throw FileTransferError( + Misc, {}, "uri %s changed %s from %s to %s during transfer", uri, what, from, to + ); + } + } + + bool attemptRetry(FileTransferError & context) + { + auto state(_state->lock()); + + assert(state->data.empty()); + + // If this is a transient error, then maybe retry after a while. after any + // bytes have been received we require range support to proceed, otherwise + // we'd need to start from scratch and discard everything we already have. + if (context.error != Transient || data.has_value() || attempt >= tries + || (transfer->writtenToSink > 0 && !transfer->acceptsRanges())) + { + return false; + } + + thread_local std::minstd_rand random{std::random_device{}()}; + std::uniform_real_distribution<> dist(0.0, 0.5); + int ms = parent.baseRetryTimeMs * std::pow(2.0f, attempt - 1 + dist(random)); + if (transfer->writtenToSink) { + warn("%s; retrying from offset %d in %d ms", context.what(), transfer->writtenToSink, ms); + } else { + warn("%s; retrying in %d ms", context.what(), ms); + } + + timespec sleep = {.tv_sec = ms / 1000, .tv_nsec = (ms % 1000) * 1000000}; + while (nanosleep(&sleep, &sleep) < 0 && errno == EINTR) { + } + + state->exc = nullptr; + + // use the effective URI of the previous transfer for retries. this avoids + // some silent corruption if a redirect changes between starting and retry + const auto & uri = metadata.effectiveUri.empty() ? this->uri : metadata.effectiveUri; + + auto newMeta = startTransfer(uri, transfer->writtenToSink); + throwChangedTarget("final destination", metadata.effectiveUri, newMeta.effectiveUri); + throwChangedTarget("ETag", metadata.etag, newMeta.etag); + throwChangedTarget( + "immutable url", + metadata.immutableUrl.value_or(""), + newMeta.immutableUrl.value_or("") + ); + + return true; + } + void awaitData() { - /* Grab data if available, otherwise wait for the download - thread to wake us up. */ - while (buffered.empty()) { - auto state(_state->lock()); + withRetries([&] { + /* Grab data if available, otherwise wait for the download + thread to wake us up. */ + while (buffered.empty()) { + auto state(_state->lock()); - if (state->data.empty()) { - if (state->done) { + if (state->data.empty()) { if (state->exc) { std::rethrow_exception(state->exc); + } else if (state->done) { + return; } - return; + + transfer->unpause(); + state.wait(state->avail); } + chunk = std::move(state->data); + buffered = chunk; transfer->unpause(); - state.wait(state->avail); } - - chunk = std::move(state->data); - buffered = chunk; - transfer->unpause(); - } + }); } size_t read(char * data, size_t len) override