forked from lix-project/lix
Revert "Fix 'error 9 while decompressing xz file'"
This reverts commit 78fa47a7f0
.
This commit is contained in:
parent
aa739e7839
commit
03f09e1d18
7 changed files with 119 additions and 156 deletions
|
@ -10,8 +10,6 @@
|
|||
#include "nar-info-disk-cache.hh"
|
||||
#include "nar-accessor.hh"
|
||||
#include "json.hh"
|
||||
#include "retry.hh"
|
||||
#include "download.hh"
|
||||
|
||||
#include <chrono>
|
||||
|
||||
|
@ -81,7 +79,6 @@ void BinaryCacheStore::getFile(const std::string & path, Sink & sink)
|
|||
|
||||
std::shared_ptr<std::string> BinaryCacheStore::getFile(const std::string & path)
|
||||
{
|
||||
return retry<std::shared_ptr<std::string>>(downloadSettings.tries, [&]() -> std::shared_ptr<std::string> {
|
||||
StringSink sink;
|
||||
try {
|
||||
getFile(path, sink);
|
||||
|
@ -89,7 +86,6 @@ std::shared_ptr<std::string> BinaryCacheStore::getFile(const std::string & path)
|
|||
return nullptr;
|
||||
}
|
||||
return sink.s;
|
||||
});
|
||||
}
|
||||
|
||||
Path BinaryCacheStore::narInfoFileFor(const Path & storePath)
|
||||
|
|
|
@ -8,7 +8,6 @@
|
|||
#include "compression.hh"
|
||||
#include "pathlocks.hh"
|
||||
#include "finally.hh"
|
||||
#include "retry.hh"
|
||||
|
||||
#ifdef ENABLE_S3
|
||||
#include <aws/core/client/ClientConfiguration.h>
|
||||
|
@ -20,9 +19,11 @@
|
|||
#include <curl/curl.h>
|
||||
|
||||
#include <algorithm>
|
||||
#include <cmath>
|
||||
#include <cstring>
|
||||
#include <iostream>
|
||||
#include <queue>
|
||||
#include <random>
|
||||
#include <thread>
|
||||
|
||||
using namespace std::string_literals;
|
||||
|
@ -45,6 +46,9 @@ struct CurlDownloader : public Downloader
|
|||
{
|
||||
CURLM * curlm = 0;
|
||||
|
||||
std::random_device rd;
|
||||
std::mt19937 mt19937;
|
||||
|
||||
struct DownloadItem : public std::enable_shared_from_this<DownloadItem>
|
||||
{
|
||||
CurlDownloader & downloader;
|
||||
|
@ -57,6 +61,12 @@ struct CurlDownloader : public Downloader
|
|||
bool active = false; // whether the handle has been added to the multi object
|
||||
std::string status;
|
||||
|
||||
unsigned int attempt = 0;
|
||||
|
||||
/* Don't start this download until the specified time point
|
||||
has been reached. */
|
||||
std::chrono::steady_clock::time_point embargo;
|
||||
|
||||
struct curl_slist * requestHeaders = 0;
|
||||
|
||||
std::string encoding;
|
||||
|
@ -377,7 +387,9 @@ struct CurlDownloader : public Downloader
|
|||
}
|
||||
}
|
||||
|
||||
fail(
|
||||
attempt++;
|
||||
|
||||
auto exc =
|
||||
code == CURLE_ABORTED_BY_CALLBACK && _isInterrupted
|
||||
? DownloadError(Interrupted, fmt("%s of '%s' was interrupted", request.verb(), request.uri))
|
||||
: httpStatus != 0
|
||||
|
@ -388,15 +400,31 @@ struct CurlDownloader : public Downloader
|
|||
)
|
||||
: DownloadError(err,
|
||||
fmt("unable to %s '%s': %s (%d)",
|
||||
request.verb(), request.uri, curl_easy_strerror(code), code)));
|
||||
request.verb(), request.uri, curl_easy_strerror(code), code));
|
||||
|
||||
/* If this is a transient error, then maybe retry the
|
||||
download after a while. */
|
||||
if (err == Transient && attempt < request.tries) {
|
||||
int ms = request.baseRetryTimeMs * std::pow(2.0f, attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(downloader.mt19937));
|
||||
printError(format("warning: %s; retrying in %d ms") % exc.what() % ms);
|
||||
embargo = std::chrono::steady_clock::now() + std::chrono::milliseconds(ms);
|
||||
downloader.enqueueItem(shared_from_this());
|
||||
}
|
||||
else
|
||||
fail(exc);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
struct State
|
||||
{
|
||||
struct EmbargoComparator {
|
||||
bool operator() (const std::shared_ptr<DownloadItem> & i1, const std::shared_ptr<DownloadItem> & i2) {
|
||||
return i1->embargo > i2->embargo;
|
||||
}
|
||||
};
|
||||
bool quit = false;
|
||||
std::vector<std::shared_ptr<DownloadItem>> incoming;
|
||||
std::priority_queue<std::shared_ptr<DownloadItem>, std::vector<std::shared_ptr<DownloadItem>>, EmbargoComparator> incoming;
|
||||
};
|
||||
|
||||
Sync<State> state_;
|
||||
|
@ -409,6 +437,7 @@ struct CurlDownloader : public Downloader
|
|||
std::thread workerThread;
|
||||
|
||||
CurlDownloader()
|
||||
: mt19937(rd())
|
||||
{
|
||||
static std::once_flag globalInit;
|
||||
std::call_once(globalInit, curl_global_init, CURL_GLOBAL_ALL);
|
||||
|
@ -502,7 +531,9 @@ struct CurlDownloader : public Downloader
|
|||
|
||||
nextWakeup = std::chrono::steady_clock::time_point();
|
||||
|
||||
/* Add new curl requests from the incoming requests queue. */
|
||||
/* Add new curl requests from the incoming requests queue,
|
||||
except for requests that are embargoed (waiting for a
|
||||
retry timeout to expire). */
|
||||
if (extraFDs[0].revents & CURL_WAIT_POLLIN) {
|
||||
char buf[1024];
|
||||
auto res = read(extraFDs[0].fd, buf, sizeof(buf));
|
||||
|
@ -511,9 +542,22 @@ struct CurlDownloader : public Downloader
|
|||
}
|
||||
|
||||
std::vector<std::shared_ptr<DownloadItem>> incoming;
|
||||
auto now = std::chrono::steady_clock::now();
|
||||
|
||||
{
|
||||
auto state(state_.lock());
|
||||
std::swap(state->incoming, incoming);
|
||||
while (!state->incoming.empty()) {
|
||||
auto item = state->incoming.top();
|
||||
if (item->embargo <= now) {
|
||||
incoming.push_back(item);
|
||||
state->incoming.pop();
|
||||
} else {
|
||||
if (nextWakeup == std::chrono::steady_clock::time_point()
|
||||
|| item->embargo < nextWakeup)
|
||||
nextWakeup = item->embargo;
|
||||
break;
|
||||
}
|
||||
}
|
||||
quit = state->quit;
|
||||
}
|
||||
|
||||
|
@ -540,7 +584,7 @@ struct CurlDownloader : public Downloader
|
|||
|
||||
{
|
||||
auto state(state_.lock());
|
||||
state->incoming.clear();
|
||||
while (!state->incoming.empty()) state->incoming.pop();
|
||||
state->quit = true;
|
||||
}
|
||||
}
|
||||
|
@ -556,7 +600,7 @@ struct CurlDownloader : public Downloader
|
|||
auto state(state_.lock());
|
||||
if (state->quit)
|
||||
throw nix::Error("cannot enqueue download request because the download thread is shutting down");
|
||||
state->incoming.push_back(item);
|
||||
state->incoming.push(item);
|
||||
}
|
||||
writeFull(wakeupPipe.writeSide.get(), " ");
|
||||
}
|
||||
|
@ -639,9 +683,7 @@ std::future<DownloadResult> Downloader::enqueueDownload(const DownloadRequest &
|
|||
|
||||
DownloadResult Downloader::download(const DownloadRequest & request)
|
||||
{
|
||||
return retry<DownloadResult>(request.tries, [&]() {
|
||||
return enqueueDownload(request).get();
|
||||
});
|
||||
}
|
||||
|
||||
void Downloader::download(DownloadRequest && request, Sink & sink)
|
||||
|
@ -827,7 +869,7 @@ CachedDownloadResult Downloader::downloadCached(
|
|||
writeFile(dataFile, url + "\n" + res.etag + "\n" + std::to_string(time(0)) + "\n");
|
||||
} catch (DownloadError & e) {
|
||||
if (storePath.empty()) throw;
|
||||
warn("%s; using cached result", e.msg());
|
||||
printError(format("warning: %1%; using cached result") % e.msg());
|
||||
result.etag = expectedETag;
|
||||
}
|
||||
}
|
||||
|
@ -878,4 +920,5 @@ bool isUri(const string & s)
|
|||
return scheme == "http" || scheme == "https" || scheme == "file" || scheme == "channel" || scheme == "git" || scheme == "s3" || scheme == "ssh";
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -96,13 +96,11 @@ struct Downloader
|
|||
|
||||
std::future<DownloadResult> enqueueDownload(const DownloadRequest & request);
|
||||
|
||||
/* Synchronously download a file. The request will be retried in
|
||||
case of transient failures. */
|
||||
/* Synchronously download a file. */
|
||||
DownloadResult download(const DownloadRequest & request);
|
||||
|
||||
/* Download a file, writing its data to a sink. The sink will be
|
||||
invoked on the thread of the caller. The request will not be
|
||||
retried in case of transient failures. */
|
||||
invoked on the thread of the caller. */
|
||||
void download(DownloadRequest && request, Sink & sink);
|
||||
|
||||
/* Check if the specified file is already in ~/.cache/nix/tarballs
|
||||
|
@ -128,11 +126,6 @@ public:
|
|||
DownloadError(Downloader::Error error, const FormatOrString & fs)
|
||||
: Error(fs), error(error)
|
||||
{ }
|
||||
|
||||
bool isTransient() override
|
||||
{
|
||||
return error == Downloader::Error::Transient;
|
||||
}
|
||||
};
|
||||
|
||||
bool isUri(const string & s);
|
||||
|
|
|
@ -2,7 +2,6 @@
|
|||
#include "download.hh"
|
||||
#include "globals.hh"
|
||||
#include "nar-info-disk-cache.hh"
|
||||
#include "retry.hh"
|
||||
|
||||
namespace nix {
|
||||
|
||||
|
@ -114,6 +113,7 @@ protected:
|
|||
DownloadRequest makeRequest(const std::string & path)
|
||||
{
|
||||
DownloadRequest request(cacheUri + "/" + path);
|
||||
request.tries = 8;
|
||||
return request;
|
||||
}
|
||||
|
||||
|
@ -136,46 +136,21 @@ protected:
|
|||
{
|
||||
checkEnabled();
|
||||
|
||||
struct State
|
||||
{
|
||||
DownloadRequest request;
|
||||
std::function<void()> tryDownload;
|
||||
unsigned int attempt = 0;
|
||||
State(DownloadRequest && request) : request(request) {}
|
||||
};
|
||||
auto request(makeRequest(path));
|
||||
|
||||
auto state = std::make_shared<State>(makeRequest(path));
|
||||
|
||||
state->tryDownload = [callback, state, this]() {
|
||||
getDownloader()->enqueueDownload(state->request,
|
||||
{[callback, state, this](std::future<DownloadResult> result) {
|
||||
getDownloader()->enqueueDownload(request,
|
||||
{[callback, this](std::future<DownloadResult> result) {
|
||||
try {
|
||||
callback(result.get().data);
|
||||
} catch (DownloadError & e) {
|
||||
if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden)
|
||||
return callback(std::shared_ptr<std::string>());
|
||||
++state->attempt;
|
||||
if (state->attempt < state->request.tries && e.isTransient()) {
|
||||
auto ms = retrySleepTime(state->attempt);
|
||||
warn("%s; retrying in %d ms", e.what(), ms);
|
||||
/* We can't sleep here because that would
|
||||
block the download thread. So use a
|
||||
separate thread for sleeping. */
|
||||
std::thread([state, ms]() {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(ms));
|
||||
state->tryDownload();
|
||||
}).detach();
|
||||
} else {
|
||||
maybeDisable();
|
||||
callback.rethrow();
|
||||
}
|
||||
} catch (...) {
|
||||
callback.rethrow();
|
||||
}
|
||||
}});
|
||||
};
|
||||
|
||||
state->tryDownload();
|
||||
}
|
||||
|
||||
};
|
||||
|
|
|
@ -6,11 +6,10 @@
|
|||
#include "thread-pool.hh"
|
||||
#include "json.hh"
|
||||
#include "derivations.hh"
|
||||
#include "retry.hh"
|
||||
#include "download.hh"
|
||||
|
||||
#include <future>
|
||||
|
||||
|
||||
namespace nix {
|
||||
|
||||
|
||||
|
@ -580,8 +579,6 @@ void Store::buildPaths(const PathSet & paths, BuildMode buildMode)
|
|||
void copyStorePath(ref<Store> srcStore, ref<Store> dstStore,
|
||||
const Path & storePath, RepairFlag repair, CheckSigsFlag checkSigs)
|
||||
{
|
||||
retry<void>(downloadSettings.tries, [&]() {
|
||||
|
||||
auto srcUri = srcStore->getUri();
|
||||
auto dstUri = dstStore->getUri();
|
||||
|
||||
|
@ -630,7 +627,6 @@ void copyStorePath(ref<Store> srcStore, ref<Store> dstStore,
|
|||
});
|
||||
|
||||
dstStore->addToStore(*info, *source, repair, checkSigs);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -1,38 +0,0 @@
|
|||
#pragma once
|
||||
|
||||
#include "logging.hh"
|
||||
|
||||
#include <functional>
|
||||
#include <cmath>
|
||||
#include <random>
|
||||
#include <thread>
|
||||
|
||||
namespace nix {
|
||||
|
||||
inline unsigned int retrySleepTime(unsigned int attempt)
|
||||
{
|
||||
std::random_device rd;
|
||||
std::mt19937 mt19937;
|
||||
return 250.0 * std::pow(2.0f,
|
||||
attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(mt19937));
|
||||
}
|
||||
|
||||
template<typename C>
|
||||
C retry(unsigned int attempts, std::function<C()> && f)
|
||||
{
|
||||
unsigned int attempt = 0;
|
||||
while (true) {
|
||||
try {
|
||||
return f();
|
||||
} catch (BaseError & e) {
|
||||
++attempt;
|
||||
if (attempt >= attempts || !e.isTransient())
|
||||
throw;
|
||||
auto ms = retrySleepTime(attempt);
|
||||
warn("%s; retrying in %d ms", e.what(), ms);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(ms));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -109,8 +109,6 @@ public:
|
|||
const string & msg() const { return err; }
|
||||
const string & prefix() const { return prefix_; }
|
||||
BaseError & addPrefix(const FormatOrString & fs);
|
||||
|
||||
virtual bool isTransient() { return false; }
|
||||
};
|
||||
|
||||
#define MakeError(newClass, superClass) \
|
||||
|
|
Loading…
Reference in a new issue