diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc index f3ca107aa..ab68cd3fd 100644 --- a/src/libstore/filetransfer.cc +++ b/src/libstore/filetransfer.cc @@ -21,7 +21,6 @@ #include #include #include -#include #include #include #include @@ -38,8 +37,6 @@ struct curlFileTransfer : public FileTransfer { CURLM * curlm = 0; - std::random_device rd; - std::mt19937 mt19937; const unsigned int baseRetryTimeMs; struct TransferItem : public std::enable_shared_from_this @@ -48,15 +45,12 @@ struct curlFileTransfer : public FileTransfer std::string uri; FileTransferResult result; Activity act; - std::optional uploadData; + std::optional uploadData; std::string downloadData; bool noBody = false; // \equiv HTTP HEAD, don't download data enum { /// nothing has been transferred yet initialSetup, - /// at least some metadata has already been transferred, - /// but the transfer did not succeed and is now retrying - retrySetup, /// data transfer in progress transferring, /// transfer complete, result or failure reported @@ -69,14 +63,8 @@ struct curlFileTransfer : public FileTransfer CURL * req; // must never be nullptr std::string statusMsg; - unsigned int attempt = 0; - const size_t tries = fileTransferSettings.tries; uint64_t bodySize = 0; - /* Don't start this download until the specified time point - has been reached. */ - std::chrono::steady_clock::time_point embargo; - struct curl_slist * requestHeaders = 0; curl_off_t writtenToSink = 0; @@ -104,8 +92,9 @@ struct curlFileTransfer : public FileTransfer ActivityId parentAct, std::invocable auto doneCallback, std::function dataCallback, - std::optional uploadData, - bool noBody + std::optional uploadData, + bool noBody, + curl_off_t writtenToSink ) : fileTransfer(fileTransfer) , uri(uri) @@ -119,6 +108,7 @@ struct curlFileTransfer : public FileTransfer }) , dataCallback(std::move(dataCallback)) , req(curl_easy_init()) + , writtenToSink(writtenToSink) { if (req == nullptr) { throw FileTransferError(Misc, {}, "could not allocate curl handle"); @@ -168,26 +158,15 @@ struct curlFileTransfer : public FileTransfer failEx(std::make_exception_ptr(std::forward(e))); } - [[noreturn]] - void throwChangedTarget(std::string_view what, std::string_view from, std::string_view to) - { - throw FileTransferError( - Misc, {}, "uri %s changed %s from %s to %s during transfer", uri, what, from, to - ); - } - void maybeFinishSetup() { - if (phase > retrySetup) { + if (phase > initialSetup) { return; } char * effectiveUriCStr = nullptr; curl_easy_getinfo(req, CURLINFO_EFFECTIVE_URL, &effectiveUriCStr); if (effectiveUriCStr) { - if (!result.effectiveUri.empty() && result.effectiveUri != effectiveUriCStr) { - throwChangedTarget("final destination", result.effectiveUri, effectiveUriCStr); - } result.effectiveUri = effectiveUriCStr; } @@ -250,9 +229,6 @@ struct curlFileTransfer : public FileTransfer // NOTE we don't check that the etag hasn't gone *missing*. technically // this is not an error as long as we get the same data from the remote. auto etag = trim(line.substr(i + 1)); - if (!result.etag.empty() && result.etag != etag) { - throwChangedTarget("ETag", result.etag, etag); - } result.etag = std::move(etag); } @@ -260,9 +236,6 @@ struct curlFileTransfer : public FileTransfer auto value = trim(line.substr(i + 1)); static std::regex linkRegex("<([^>]*)>; rel=\"immutable\"", std::regex::extended | std::regex::icase); if (std::smatch match; std::regex_match(value, match, linkRegex)) { - if (result.immutableUrl && result.immutableUrl != match.str(1)) { - throwChangedTarget("immutable url", *result.immutableUrl, match.str(1)); - } result.immutableUrl = match.str(1); } else debug("got invalid link header '%s'", value); @@ -324,10 +297,6 @@ struct curlFileTransfer : public FileTransfer void init() { - if (phase > initialSetup) { - phase = retrySetup; - } - curl_easy_reset(req); if (verbosity >= lvlVomit) { @@ -335,10 +304,6 @@ struct curlFileTransfer : public FileTransfer curl_easy_setopt(req, CURLOPT_DEBUGFUNCTION, TransferItem::debugCallback); } - // use the effective URI of the previous transfer for retries. this avoids - // some silent corruption if a redirect changes between starting and retry. - const auto & uri = result.effectiveUri.empty() ? this->uri : result.effectiveUri; - curl_easy_setopt(req, CURLOPT_URL, uri.c_str()); curl_easy_setopt(req, CURLOPT_FOLLOWLOCATION, 1L); curl_easy_setopt(req, CURLOPT_ACCEPT_ENCODING, ""); // all of them! @@ -467,8 +432,6 @@ struct curlFileTransfer : public FileTransfer #pragma GCC diagnostic pop } - attempt++; - std::optional response; if (!successfulStatuses.count(httpStatus)) response = std::move(downloadData); @@ -486,27 +449,7 @@ struct curlFileTransfer : public FileTransfer "unable to %s '%s': %s (%d)", verb(), uri, curl_easy_strerror(code), code); - /* If this is a transient error, then maybe retry the - download after a while. If we're writing to a - sink, we can only retry if the server supports - ranged requests. */ - if (err == Transient - && !uploadData.has_value() - && attempt < tries - && (!this->dataCallback - || writtenToSink == 0 - || acceptsRanges())) - { - int ms = fileTransfer.baseRetryTimeMs * std::pow(2.0f, attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(fileTransfer.mt19937)); - if (writtenToSink) - warn("%s; retrying from offset %d in %d ms", exc.what(), writtenToSink, ms); - else - warn("%s; retrying in %d ms", exc.what(), ms); - embargo = std::chrono::steady_clock::now() + std::chrono::milliseconds(ms); - fileTransfer.enqueueItem(shared_from_this()); - } - else - fail(std::move(exc)); + fail(std::move(exc)); } } @@ -535,13 +478,8 @@ struct curlFileTransfer : public FileTransfer struct State { - struct EmbargoComparator { - bool operator() (const std::shared_ptr & i1, const std::shared_ptr & i2) { - return i1->embargo > i2->embargo; - } - }; bool quit = false; - std::priority_queue, std::vector>, EmbargoComparator> incoming; + std::vector> incoming; std::vector> unpause; std::map, std::promise> cancel; }; @@ -551,8 +489,7 @@ struct curlFileTransfer : public FileTransfer std::thread workerThread; curlFileTransfer(unsigned int baseRetryTimeMs) - : mt19937(rd()) - , baseRetryTimeMs(baseRetryTimeMs) + : baseRetryTimeMs(baseRetryTimeMs) { static std::once_flag globalInit; std::call_once(globalInit, curl_global_init, CURL_GLOBAL_ALL); @@ -657,7 +594,6 @@ struct curlFileTransfer : public FileTransfer retry timeout to expire). */ std::vector> incoming; - auto now = std::chrono::steady_clock::now(); timeoutMs = INT64_MAX; @@ -670,18 +606,7 @@ struct curlFileTransfer : public FileTransfer { auto state(state_.lock()); - while (!state->incoming.empty()) { - auto item = state->incoming.top(); - if (item->embargo <= now) { - incoming.push_back(item); - state->incoming.pop(); - } else { - using namespace std::chrono; - auto wait = duration_cast(item->embargo - now); - timeoutMs = std::min(timeoutMs, wait.count()); - break; - } - } + incoming = std::move(state->incoming); quit = state->quit; } @@ -707,7 +632,7 @@ struct curlFileTransfer : public FileTransfer { auto state(state_.lock()); - while (!state->incoming.empty()) state->incoming.pop(); + state->incoming.clear(); state->quit = true; } } @@ -723,7 +648,7 @@ struct curlFileTransfer : public FileTransfer auto state(state_.lock()); if (state->quit) throw nix::Error("cannot enqueue download request because the download thread is shutting down"); - state->incoming.push(item); + state->incoming.push_back(item); } wakeup(); } @@ -848,9 +773,8 @@ struct curlFileTransfer : public FileTransfer } auto source = make_box_ptr(*this, uri, headers, std::move(data), noBody); - auto metadata = source->startTransfer(); source->awaitData(); - return {std::move(metadata), std::move(source)}; + return {source->metadata, std::move(source)}; } struct TransferSource : Source @@ -867,12 +791,17 @@ struct curlFileTransfer : public FileTransfer Headers headers; std::optional data; bool noBody; + ActivityId parentAct = getCurActivity(); const std::shared_ptr> _state = std::make_shared>(); std::shared_ptr transfer; + FileTransferResult metadata; std::string chunk; std::string_view buffered; + unsigned int attempt = 0; + const size_t tries = fileTransferSettings.tries; + TransferSource( curlFileTransfer & parent, const std::string & uri, @@ -886,6 +815,7 @@ struct curlFileTransfer : public FileTransfer , data(std::move(data)) , noBody(noBody) { + metadata = withRetries([&] { return startTransfer(uri); }); } ~TransferSource() @@ -900,16 +830,30 @@ struct curlFileTransfer : public FileTransfer } } - FileTransferResult startTransfer() + auto withRetries(auto fn) -> decltype(fn()) { + while (true) { + try { + return fn(); + } catch (FileTransferError & e) { + if (!attemptRetry(e)) { + throw; + } + } + } + } + + FileTransferResult startTransfer(const std::string & uri, curl_off_t offset = 0) + { + attempt += 1; transfer = std::make_shared( parent, uri, headers, - getCurActivity(), + parentAct, [_state{_state}](std::exception_ptr ex) { auto state(_state->lock()); - state->done = true; + state->done = ex == nullptr; state->exc = ex; state->avail.notify_one(); }, @@ -931,36 +875,93 @@ struct curlFileTransfer : public FileTransfer state->avail.notify_one(); return true; }, - std::move(data), - noBody + data ? std::optional(std::string_view(*data)) : std::nullopt, + noBody, + offset ); parent.enqueueItem(transfer); return transfer->metadataPromise.get_future().get(); } + void throwChangedTarget(std::string_view what, std::string_view from, std::string_view to) + { + if (!from.empty() && from != to) { + throw FileTransferError( + Misc, {}, "uri %s changed %s from %s to %s during transfer", uri, what, from, to + ); + } + } + + bool attemptRetry(FileTransferError & context) + { + auto state(_state->lock()); + + assert(state->data.empty()); + + // If this is a transient error, then maybe retry after a while. after any + // bytes have been received we require range support to proceed, otherwise + // we'd need to start from scratch and discard everything we already have. + if (context.error != Transient || data.has_value() || attempt >= tries + || (transfer->writtenToSink > 0 && !transfer->acceptsRanges())) + { + return false; + } + + thread_local std::minstd_rand random{std::random_device{}()}; + std::uniform_real_distribution<> dist(0.0, 0.5); + int ms = parent.baseRetryTimeMs * std::pow(2.0f, attempt - 1 + dist(random)); + if (transfer->writtenToSink) { + warn("%s; retrying from offset %d in %d ms", context.what(), transfer->writtenToSink, ms); + } else { + warn("%s; retrying in %d ms", context.what(), ms); + } + + timespec sleep = {.tv_sec = ms / 1000, .tv_nsec = (ms % 1000) * 1000000}; + while (nanosleep(&sleep, &sleep) < 0 && errno == EINTR) { + } + + state->exc = nullptr; + + // use the effective URI of the previous transfer for retries. this avoids + // some silent corruption if a redirect changes between starting and retry + const auto & uri = metadata.effectiveUri.empty() ? this->uri : metadata.effectiveUri; + + auto newMeta = startTransfer(uri, transfer->writtenToSink); + throwChangedTarget("final destination", metadata.effectiveUri, newMeta.effectiveUri); + throwChangedTarget("ETag", metadata.etag, newMeta.etag); + throwChangedTarget( + "immutable url", + metadata.immutableUrl.value_or(""), + newMeta.immutableUrl.value_or("") + ); + + return true; + } + void awaitData() { - /* Grab data if available, otherwise wait for the download - thread to wake us up. */ - while (buffered.empty()) { - auto state(_state->lock()); + withRetries([&] { + /* Grab data if available, otherwise wait for the download + thread to wake us up. */ + while (buffered.empty()) { + auto state(_state->lock()); - if (state->data.empty()) { - if (state->done) { + if (state->data.empty()) { if (state->exc) { std::rethrow_exception(state->exc); + } else if (state->done) { + return; } - return; + + transfer->unpause(); + state.wait(state->avail); } + chunk = std::move(state->data); + buffered = chunk; transfer->unpause(); - state.wait(state->avail); } - - chunk = std::move(state->data); - buffered = chunk; - transfer->unpause(); - } + }); } size_t read(char * data, size_t len) override