libstore: move file transfer retry handling out of TransferItem

this simplifies the immediate curl wrappers significantly and clarifies
control flow for retries. we can almost see the promise land from here!

Change-Id: Idc66b744631eec9c1ad5a2be388beb942a04f1f9
This commit is contained in:
eldritch horrors 2024-11-09 01:17:28 +01:00
parent d75c399b29
commit 001a078b96

View file

@ -21,7 +21,6 @@
#include <algorithm> #include <algorithm>
#include <cmath> #include <cmath>
#include <cstring> #include <cstring>
#include <queue>
#include <random> #include <random>
#include <thread> #include <thread>
#include <regex> #include <regex>
@ -38,8 +37,6 @@ struct curlFileTransfer : public FileTransfer
{ {
CURLM * curlm = 0; CURLM * curlm = 0;
std::random_device rd;
std::mt19937 mt19937;
const unsigned int baseRetryTimeMs; const unsigned int baseRetryTimeMs;
struct TransferItem : public std::enable_shared_from_this<TransferItem> struct TransferItem : public std::enable_shared_from_this<TransferItem>
@ -48,15 +45,12 @@ struct curlFileTransfer : public FileTransfer
std::string uri; std::string uri;
FileTransferResult result; FileTransferResult result;
Activity act; Activity act;
std::optional<std::string> uploadData; std::optional<std::string_view> uploadData;
std::string downloadData; std::string downloadData;
bool noBody = false; // \equiv HTTP HEAD, don't download data bool noBody = false; // \equiv HTTP HEAD, don't download data
enum { enum {
/// nothing has been transferred yet /// nothing has been transferred yet
initialSetup, initialSetup,
/// at least some metadata has already been transferred,
/// but the transfer did not succeed and is now retrying
retrySetup,
/// data transfer in progress /// data transfer in progress
transferring, transferring,
/// transfer complete, result or failure reported /// transfer complete, result or failure reported
@ -69,14 +63,8 @@ struct curlFileTransfer : public FileTransfer
CURL * req; // must never be nullptr CURL * req; // must never be nullptr
std::string statusMsg; std::string statusMsg;
unsigned int attempt = 0;
const size_t tries = fileTransferSettings.tries;
uint64_t bodySize = 0; uint64_t bodySize = 0;
/* Don't start this download until the specified time point
has been reached. */
std::chrono::steady_clock::time_point embargo;
struct curl_slist * requestHeaders = 0; struct curl_slist * requestHeaders = 0;
curl_off_t writtenToSink = 0; curl_off_t writtenToSink = 0;
@ -104,8 +92,9 @@ struct curlFileTransfer : public FileTransfer
ActivityId parentAct, ActivityId parentAct,
std::invocable<std::exception_ptr> auto doneCallback, std::invocable<std::exception_ptr> auto doneCallback,
std::function<bool(std::string_view data)> dataCallback, std::function<bool(std::string_view data)> dataCallback,
std::optional<std::string> uploadData, std::optional<std::string_view> uploadData,
bool noBody bool noBody,
curl_off_t writtenToSink
) )
: fileTransfer(fileTransfer) : fileTransfer(fileTransfer)
, uri(uri) , uri(uri)
@ -119,6 +108,7 @@ struct curlFileTransfer : public FileTransfer
}) })
, dataCallback(std::move(dataCallback)) , dataCallback(std::move(dataCallback))
, req(curl_easy_init()) , req(curl_easy_init())
, writtenToSink(writtenToSink)
{ {
if (req == nullptr) { if (req == nullptr) {
throw FileTransferError(Misc, {}, "could not allocate curl handle"); throw FileTransferError(Misc, {}, "could not allocate curl handle");
@ -168,26 +158,15 @@ struct curlFileTransfer : public FileTransfer
failEx(std::make_exception_ptr(std::forward<T>(e))); failEx(std::make_exception_ptr(std::forward<T>(e)));
} }
[[noreturn]]
void throwChangedTarget(std::string_view what, std::string_view from, std::string_view to)
{
throw FileTransferError(
Misc, {}, "uri %s changed %s from %s to %s during transfer", uri, what, from, to
);
}
void maybeFinishSetup() void maybeFinishSetup()
{ {
if (phase > retrySetup) { if (phase > initialSetup) {
return; return;
} }
char * effectiveUriCStr = nullptr; char * effectiveUriCStr = nullptr;
curl_easy_getinfo(req, CURLINFO_EFFECTIVE_URL, &effectiveUriCStr); curl_easy_getinfo(req, CURLINFO_EFFECTIVE_URL, &effectiveUriCStr);
if (effectiveUriCStr) { if (effectiveUriCStr) {
if (!result.effectiveUri.empty() && result.effectiveUri != effectiveUriCStr) {
throwChangedTarget("final destination", result.effectiveUri, effectiveUriCStr);
}
result.effectiveUri = effectiveUriCStr; result.effectiveUri = effectiveUriCStr;
} }
@ -250,9 +229,6 @@ struct curlFileTransfer : public FileTransfer
// NOTE we don't check that the etag hasn't gone *missing*. technically // NOTE we don't check that the etag hasn't gone *missing*. technically
// this is not an error as long as we get the same data from the remote. // this is not an error as long as we get the same data from the remote.
auto etag = trim(line.substr(i + 1)); auto etag = trim(line.substr(i + 1));
if (!result.etag.empty() && result.etag != etag) {
throwChangedTarget("ETag", result.etag, etag);
}
result.etag = std::move(etag); result.etag = std::move(etag);
} }
@ -260,9 +236,6 @@ struct curlFileTransfer : public FileTransfer
auto value = trim(line.substr(i + 1)); auto value = trim(line.substr(i + 1));
static std::regex linkRegex("<([^>]*)>; rel=\"immutable\"", std::regex::extended | std::regex::icase); static std::regex linkRegex("<([^>]*)>; rel=\"immutable\"", std::regex::extended | std::regex::icase);
if (std::smatch match; std::regex_match(value, match, linkRegex)) { if (std::smatch match; std::regex_match(value, match, linkRegex)) {
if (result.immutableUrl && result.immutableUrl != match.str(1)) {
throwChangedTarget("immutable url", *result.immutableUrl, match.str(1));
}
result.immutableUrl = match.str(1); result.immutableUrl = match.str(1);
} else } else
debug("got invalid link header '%s'", value); debug("got invalid link header '%s'", value);
@ -324,10 +297,6 @@ struct curlFileTransfer : public FileTransfer
void init() void init()
{ {
if (phase > initialSetup) {
phase = retrySetup;
}
curl_easy_reset(req); curl_easy_reset(req);
if (verbosity >= lvlVomit) { if (verbosity >= lvlVomit) {
@ -335,10 +304,6 @@ struct curlFileTransfer : public FileTransfer
curl_easy_setopt(req, CURLOPT_DEBUGFUNCTION, TransferItem::debugCallback); curl_easy_setopt(req, CURLOPT_DEBUGFUNCTION, TransferItem::debugCallback);
} }
// use the effective URI of the previous transfer for retries. this avoids
// some silent corruption if a redirect changes between starting and retry.
const auto & uri = result.effectiveUri.empty() ? this->uri : result.effectiveUri;
curl_easy_setopt(req, CURLOPT_URL, uri.c_str()); curl_easy_setopt(req, CURLOPT_URL, uri.c_str());
curl_easy_setopt(req, CURLOPT_FOLLOWLOCATION, 1L); curl_easy_setopt(req, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(req, CURLOPT_ACCEPT_ENCODING, ""); // all of them! curl_easy_setopt(req, CURLOPT_ACCEPT_ENCODING, ""); // all of them!
@ -467,8 +432,6 @@ struct curlFileTransfer : public FileTransfer
#pragma GCC diagnostic pop #pragma GCC diagnostic pop
} }
attempt++;
std::optional<std::string> response; std::optional<std::string> response;
if (!successfulStatuses.count(httpStatus)) if (!successfulStatuses.count(httpStatus))
response = std::move(downloadData); response = std::move(downloadData);
@ -486,26 +449,6 @@ struct curlFileTransfer : public FileTransfer
"unable to %s '%s': %s (%d)", "unable to %s '%s': %s (%d)",
verb(), uri, curl_easy_strerror(code), code); verb(), uri, curl_easy_strerror(code), code);
/* If this is a transient error, then maybe retry the
download after a while. If we're writing to a
sink, we can only retry if the server supports
ranged requests. */
if (err == Transient
&& !uploadData.has_value()
&& attempt < tries
&& (!this->dataCallback
|| writtenToSink == 0
|| acceptsRanges()))
{
int ms = fileTransfer.baseRetryTimeMs * std::pow(2.0f, attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(fileTransfer.mt19937));
if (writtenToSink)
warn("%s; retrying from offset %d in %d ms", exc.what(), writtenToSink, ms);
else
warn("%s; retrying in %d ms", exc.what(), ms);
embargo = std::chrono::steady_clock::now() + std::chrono::milliseconds(ms);
fileTransfer.enqueueItem(shared_from_this());
}
else
fail(std::move(exc)); fail(std::move(exc));
} }
} }
@ -535,13 +478,8 @@ struct curlFileTransfer : public FileTransfer
struct State struct State
{ {
struct EmbargoComparator {
bool operator() (const std::shared_ptr<TransferItem> & i1, const std::shared_ptr<TransferItem> & i2) {
return i1->embargo > i2->embargo;
}
};
bool quit = false; bool quit = false;
std::priority_queue<std::shared_ptr<TransferItem>, std::vector<std::shared_ptr<TransferItem>>, EmbargoComparator> incoming; std::vector<std::shared_ptr<TransferItem>> incoming;
std::vector<std::shared_ptr<TransferItem>> unpause; std::vector<std::shared_ptr<TransferItem>> unpause;
std::map<std::shared_ptr<TransferItem>, std::promise<void>> cancel; std::map<std::shared_ptr<TransferItem>, std::promise<void>> cancel;
}; };
@ -551,8 +489,7 @@ struct curlFileTransfer : public FileTransfer
std::thread workerThread; std::thread workerThread;
curlFileTransfer(unsigned int baseRetryTimeMs) curlFileTransfer(unsigned int baseRetryTimeMs)
: mt19937(rd()) : baseRetryTimeMs(baseRetryTimeMs)
, baseRetryTimeMs(baseRetryTimeMs)
{ {
static std::once_flag globalInit; static std::once_flag globalInit;
std::call_once(globalInit, curl_global_init, CURL_GLOBAL_ALL); std::call_once(globalInit, curl_global_init, CURL_GLOBAL_ALL);
@ -657,7 +594,6 @@ struct curlFileTransfer : public FileTransfer
retry timeout to expire). */ retry timeout to expire). */
std::vector<std::shared_ptr<TransferItem>> incoming; std::vector<std::shared_ptr<TransferItem>> incoming;
auto now = std::chrono::steady_clock::now();
timeoutMs = INT64_MAX; timeoutMs = INT64_MAX;
@ -670,18 +606,7 @@ struct curlFileTransfer : public FileTransfer
{ {
auto state(state_.lock()); auto state(state_.lock());
while (!state->incoming.empty()) { incoming = std::move(state->incoming);
auto item = state->incoming.top();
if (item->embargo <= now) {
incoming.push_back(item);
state->incoming.pop();
} else {
using namespace std::chrono;
auto wait = duration_cast<milliseconds>(item->embargo - now);
timeoutMs = std::min<int64_t>(timeoutMs, wait.count());
break;
}
}
quit = state->quit; quit = state->quit;
} }
@ -707,7 +632,7 @@ struct curlFileTransfer : public FileTransfer
{ {
auto state(state_.lock()); auto state(state_.lock());
while (!state->incoming.empty()) state->incoming.pop(); state->incoming.clear();
state->quit = true; state->quit = true;
} }
} }
@ -723,7 +648,7 @@ struct curlFileTransfer : public FileTransfer
auto state(state_.lock()); auto state(state_.lock());
if (state->quit) if (state->quit)
throw nix::Error("cannot enqueue download request because the download thread is shutting down"); throw nix::Error("cannot enqueue download request because the download thread is shutting down");
state->incoming.push(item); state->incoming.push_back(item);
} }
wakeup(); wakeup();
} }
@ -848,9 +773,8 @@ struct curlFileTransfer : public FileTransfer
} }
auto source = make_box_ptr<TransferSource>(*this, uri, headers, std::move(data), noBody); auto source = make_box_ptr<TransferSource>(*this, uri, headers, std::move(data), noBody);
auto metadata = source->startTransfer();
source->awaitData(); source->awaitData();
return {std::move(metadata), std::move(source)}; return {source->metadata, std::move(source)};
} }
struct TransferSource : Source struct TransferSource : Source
@ -867,12 +791,17 @@ struct curlFileTransfer : public FileTransfer
Headers headers; Headers headers;
std::optional<std::string> data; std::optional<std::string> data;
bool noBody; bool noBody;
ActivityId parentAct = getCurActivity();
const std::shared_ptr<Sync<State>> _state = std::make_shared<Sync<State>>(); const std::shared_ptr<Sync<State>> _state = std::make_shared<Sync<State>>();
std::shared_ptr<TransferItem> transfer; std::shared_ptr<TransferItem> transfer;
FileTransferResult metadata;
std::string chunk; std::string chunk;
std::string_view buffered; std::string_view buffered;
unsigned int attempt = 0;
const size_t tries = fileTransferSettings.tries;
TransferSource( TransferSource(
curlFileTransfer & parent, curlFileTransfer & parent,
const std::string & uri, const std::string & uri,
@ -886,6 +815,7 @@ struct curlFileTransfer : public FileTransfer
, data(std::move(data)) , data(std::move(data))
, noBody(noBody) , noBody(noBody)
{ {
metadata = withRetries([&] { return startTransfer(uri); });
} }
~TransferSource() ~TransferSource()
@ -900,16 +830,30 @@ struct curlFileTransfer : public FileTransfer
} }
} }
FileTransferResult startTransfer() auto withRetries(auto fn) -> decltype(fn())
{ {
while (true) {
try {
return fn();
} catch (FileTransferError & e) {
if (!attemptRetry(e)) {
throw;
}
}
}
}
FileTransferResult startTransfer(const std::string & uri, curl_off_t offset = 0)
{
attempt += 1;
transfer = std::make_shared<TransferItem>( transfer = std::make_shared<TransferItem>(
parent, parent,
uri, uri,
headers, headers,
getCurActivity(), parentAct,
[_state{_state}](std::exception_ptr ex) { [_state{_state}](std::exception_ptr ex) {
auto state(_state->lock()); auto state(_state->lock());
state->done = true; state->done = ex == nullptr;
state->exc = ex; state->exc = ex;
state->avail.notify_one(); state->avail.notify_one();
}, },
@ -931,25 +875,81 @@ struct curlFileTransfer : public FileTransfer
state->avail.notify_one(); state->avail.notify_one();
return true; return true;
}, },
std::move(data), data ? std::optional(std::string_view(*data)) : std::nullopt,
noBody noBody,
offset
); );
parent.enqueueItem(transfer); parent.enqueueItem(transfer);
return transfer->metadataPromise.get_future().get(); return transfer->metadataPromise.get_future().get();
} }
void throwChangedTarget(std::string_view what, std::string_view from, std::string_view to)
{
if (!from.empty() && from != to) {
throw FileTransferError(
Misc, {}, "uri %s changed %s from %s to %s during transfer", uri, what, from, to
);
}
}
bool attemptRetry(FileTransferError & context)
{
auto state(_state->lock());
assert(state->data.empty());
// If this is a transient error, then maybe retry after a while. after any
// bytes have been received we require range support to proceed, otherwise
// we'd need to start from scratch and discard everything we already have.
if (context.error != Transient || data.has_value() || attempt >= tries
|| (transfer->writtenToSink > 0 && !transfer->acceptsRanges()))
{
return false;
}
thread_local std::minstd_rand random{std::random_device{}()};
std::uniform_real_distribution<> dist(0.0, 0.5);
int ms = parent.baseRetryTimeMs * std::pow(2.0f, attempt - 1 + dist(random));
if (transfer->writtenToSink) {
warn("%s; retrying from offset %d in %d ms", context.what(), transfer->writtenToSink, ms);
} else {
warn("%s; retrying in %d ms", context.what(), ms);
}
timespec sleep = {.tv_sec = ms / 1000, .tv_nsec = (ms % 1000) * 1000000};
while (nanosleep(&sleep, &sleep) < 0 && errno == EINTR) {
}
state->exc = nullptr;
// use the effective URI of the previous transfer for retries. this avoids
// some silent corruption if a redirect changes between starting and retry
const auto & uri = metadata.effectiveUri.empty() ? this->uri : metadata.effectiveUri;
auto newMeta = startTransfer(uri, transfer->writtenToSink);
throwChangedTarget("final destination", metadata.effectiveUri, newMeta.effectiveUri);
throwChangedTarget("ETag", metadata.etag, newMeta.etag);
throwChangedTarget(
"immutable url",
metadata.immutableUrl.value_or(""),
newMeta.immutableUrl.value_or("")
);
return true;
}
void awaitData() void awaitData()
{ {
withRetries([&] {
/* Grab data if available, otherwise wait for the download /* Grab data if available, otherwise wait for the download
thread to wake us up. */ thread to wake us up. */
while (buffered.empty()) { while (buffered.empty()) {
auto state(_state->lock()); auto state(_state->lock());
if (state->data.empty()) { if (state->data.empty()) {
if (state->done) {
if (state->exc) { if (state->exc) {
std::rethrow_exception(state->exc); std::rethrow_exception(state->exc);
} } else if (state->done) {
return; return;
} }
@ -961,6 +961,7 @@ struct curlFileTransfer : public FileTransfer
buffered = chunk; buffered = chunk;
transfer->unpause(); transfer->unpause();
} }
});
} }
size_t read(char * data, size_t len) override size_t read(char * data, size_t len) override