libstore: move remaining retry state out of TransferItem

Change-Id: Ie2c22722bdda18027017e12d10e16931a694743e
This commit is contained in:
eldritch horrors 2024-11-12 14:01:46 +01:00
parent 8a9094303b
commit 5be7956592

View file

@ -66,8 +66,6 @@ struct curlFileTransfer : public FileTransfer
struct curl_slist * requestHeaders = 0; struct curl_slist * requestHeaders = 0;
curl_off_t writtenToSink = 0;
inline static const std::set<long> successfulStatuses {200, 201, 204, 206, 304, 0 /* other protocol */}; inline static const std::set<long> successfulStatuses {200, 201, 204, 206, 304, 0 /* other protocol */};
/* Get the HTTP status code, or 0 for other protocols. */ /* Get the HTTP status code, or 0 for other protocols. */
long getHTTPStatus() long getHTTPStatus()
@ -106,7 +104,6 @@ struct curlFileTransfer : public FileTransfer
}) })
, dataCallback(std::move(dataCallback)) , dataCallback(std::move(dataCallback))
, req(curl_easy_init()) , req(curl_easy_init())
, writtenToSink(writtenToSink)
{ {
if (req == nullptr) { if (req == nullptr) {
throw FileTransferError(Misc, {}, "could not allocate curl handle"); throw FileTransferError(Misc, {}, "could not allocate curl handle");
@ -249,7 +246,6 @@ struct curlFileTransfer : public FileTransfer
if (!dataCallback({static_cast<const char *>(contents), realSize})) { if (!dataCallback({static_cast<const char *>(contents), realSize})) {
return CURL_WRITEFUNC_PAUSE; return CURL_WRITEFUNC_PAUSE;
} }
writtenToSink += realSize;
} else { } else {
this->downloadData.append(static_cast<const char *>(contents), realSize); this->downloadData.append(static_cast<const char *>(contents), realSize);
} }
@ -275,8 +271,6 @@ struct curlFileTransfer : public FileTransfer
static std::regex statusLine("HTTP/[^ ]+ +[0-9]+(.*)", std::regex::extended | std::regex::icase); static std::regex statusLine("HTTP/[^ ]+ +[0-9]+(.*)", std::regex::extended | std::regex::icase);
if (std::smatch match; std::regex_match(line, match, statusLine)) { if (std::smatch match; std::regex_match(line, match, statusLine)) {
downloadData.clear();
bodySize = 0;
statusMsg = trim(match.str(1)); statusMsg = trim(match.str(1));
} else { } else {
auto i = line.find(':'); auto i = line.find(':');
@ -790,6 +784,7 @@ struct curlFileTransfer : public FileTransfer
unsigned int attempt = 0; unsigned int attempt = 0;
const size_t tries = fileTransferSettings.tries; const size_t tries = fileTransferSettings.tries;
curl_off_t totalReceived = 0;
TransferSource( TransferSource(
curlFileTransfer & parent, curlFileTransfer & parent,
@ -821,13 +816,23 @@ struct curlFileTransfer : public FileTransfer
auto withRetries(auto fn) -> decltype(fn()) auto withRetries(auto fn) -> decltype(fn())
{ {
std::optional<std::string> retryContext;
while (true) { while (true) {
try { try {
if (retryContext) {
attemptRetry(*retryContext);
}
return fn(); return fn();
} catch (FileTransferError & e) { } catch (FileTransferError & e) {
if (!attemptRetry(e)) { // If this is a transient error, then maybe retry after a while. after any
// bytes have been received we require range support to proceed, otherwise
// we'd need to start from scratch and discard everything we already have.
if (e.error != Transient || data.has_value() || attempt >= tries
|| (totalReceived > 0 && !transfer->acceptsRanges()))
{
throw; throw;
} }
retryContext = e.what();
} }
} }
} }
@ -881,33 +886,22 @@ struct curlFileTransfer : public FileTransfer
} }
} }
bool attemptRetry(FileTransferError & context) bool attemptRetry(const std::string & context)
{ {
auto state(_state->lock()); auto state(_state->lock());
assert(state->data.empty()); assert(state->data.empty());
// If this is a transient error, then maybe retry after a while. after any
// bytes have been received we require range support to proceed, otherwise
// we'd need to start from scratch and discard everything we already have.
if (context.error != Transient || data.has_value() || attempt >= tries
|| (transfer->writtenToSink > 0 && !transfer->acceptsRanges()))
{
return false;
}
thread_local std::minstd_rand random{std::random_device{}()}; thread_local std::minstd_rand random{std::random_device{}()};
std::uniform_real_distribution<> dist(0.0, 0.5); std::uniform_real_distribution<> dist(0.0, 0.5);
int ms = parent.baseRetryTimeMs * std::pow(2.0f, attempt - 1 + dist(random)); int ms = parent.baseRetryTimeMs * std::pow(2.0f, attempt - 1 + dist(random));
if (transfer->writtenToSink) { if (totalReceived) {
warn("%s; retrying from offset %d in %d ms", context.what(), transfer->writtenToSink, ms); warn("%s; retrying from offset %d in %d ms", context, totalReceived, ms);
} else { } else {
warn("%s; retrying in %d ms", context.what(), ms); warn("%s; retrying in %d ms", context, ms);
} }
timespec sleep = {.tv_sec = ms / 1000, .tv_nsec = (ms % 1000) * 1000000}; std::this_thread::sleep_for(std::chrono::milliseconds(ms));
while (nanosleep(&sleep, &sleep) < 0 && errno == EINTR) {
}
state->exc = nullptr; state->exc = nullptr;
@ -915,7 +909,7 @@ struct curlFileTransfer : public FileTransfer
// some silent corruption if a redirect changes between starting and retry // some silent corruption if a redirect changes between starting and retry
const auto & uri = metadata.effectiveUri.empty() ? this->uri : metadata.effectiveUri; const auto & uri = metadata.effectiveUri.empty() ? this->uri : metadata.effectiveUri;
auto newMeta = startTransfer(uri, transfer->writtenToSink); auto newMeta = startTransfer(uri, totalReceived);
throwChangedTarget("final destination", metadata.effectiveUri, newMeta.effectiveUri); throwChangedTarget("final destination", metadata.effectiveUri, newMeta.effectiveUri);
throwChangedTarget("ETag", metadata.etag, newMeta.etag); throwChangedTarget("ETag", metadata.etag, newMeta.etag);
throwChangedTarget( throwChangedTarget(
@ -948,6 +942,7 @@ struct curlFileTransfer : public FileTransfer
chunk = std::move(state->data); chunk = std::move(state->data);
buffered = chunk; buffered = chunk;
totalReceived += chunk.size();
transfer->unpause(); transfer->unpause();
} }
}); });