From aa739e783993864aa6e0c8a4820e6b59f4626d92 Mon Sep 17 00:00:00 2001 From: Eelco Dolstra Date: Wed, 10 Jul 2019 10:29:41 +0200 Subject: [PATCH 1/5] nix copy: Rename --substitute to --substitute-on-destination '--substitute' was being shadowed by the regular '--substitute' (the short-hand for '--option substitute true'). Fixes #2983. --- src/nix/copy.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/nix/copy.cc b/src/nix/copy.cc index 96bd453d8..12a9f9cd3 100644 --- a/src/nix/copy.cc +++ b/src/nix/copy.cc @@ -36,7 +36,7 @@ struct CmdCopy : StorePathsCommand .set(&checkSigs, NoCheckSigs); mkFlag() - .longName("substitute") + .longName("substitute-on-destination") .shortName('s') .description("whether to try substitutes on the destination store (only supported by SSH)") .set(&substitute, Substitute); From 03f09e1d18c39706b917964f14f9e40513e4aaea Mon Sep 17 00:00:00 2001 From: Eelco Dolstra Date: Wed, 10 Jul 2019 19:46:15 +0200 Subject: [PATCH 2/5] Revert "Fix 'error 9 while decompressing xz file'" This reverts commit 78fa47a7f08a4cb6ee7061bf0bd86a40e1d6dc91. --- src/libstore/binary-cache-store.cc | 18 +++--- src/libstore/download.cc | 67 ++++++++++++++++---- src/libstore/download.hh | 11 +--- src/libstore/http-binary-cache-store.cc | 55 +++++----------- src/libstore/store-api.cc | 84 ++++++++++++------------- src/libutil/retry.hh | 38 ----------- src/libutil/types.hh | 2 - 7 files changed, 119 insertions(+), 156 deletions(-) delete mode 100644 src/libutil/retry.hh diff --git a/src/libstore/binary-cache-store.cc b/src/libstore/binary-cache-store.cc index 8b736056e..4527ee6ba 100644 --- a/src/libstore/binary-cache-store.cc +++ b/src/libstore/binary-cache-store.cc @@ -10,8 +10,6 @@ #include "nar-info-disk-cache.hh" #include "nar-accessor.hh" #include "json.hh" -#include "retry.hh" -#include "download.hh" #include @@ -81,15 +79,13 @@ void BinaryCacheStore::getFile(const std::string & path, Sink & sink) std::shared_ptr BinaryCacheStore::getFile(const std::string & path) { - return retry>(downloadSettings.tries, [&]() -> std::shared_ptr { - StringSink sink; - try { - getFile(path, sink); - } catch (NoSuchBinaryCacheFile &) { - return nullptr; - } - return sink.s; - }); + StringSink sink; + try { + getFile(path, sink); + } catch (NoSuchBinaryCacheFile &) { + return nullptr; + } + return sink.s; } Path BinaryCacheStore::narInfoFileFor(const Path & storePath) diff --git a/src/libstore/download.cc b/src/libstore/download.cc index 7a2af237e..2267b5d35 100644 --- a/src/libstore/download.cc +++ b/src/libstore/download.cc @@ -8,7 +8,6 @@ #include "compression.hh" #include "pathlocks.hh" #include "finally.hh" -#include "retry.hh" #ifdef ENABLE_S3 #include @@ -20,9 +19,11 @@ #include #include +#include #include #include #include +#include #include using namespace std::string_literals; @@ -45,6 +46,9 @@ struct CurlDownloader : public Downloader { CURLM * curlm = 0; + std::random_device rd; + std::mt19937 mt19937; + struct DownloadItem : public std::enable_shared_from_this { CurlDownloader & downloader; @@ -57,6 +61,12 @@ struct CurlDownloader : public Downloader bool active = false; // whether the handle has been added to the multi object std::string status; + unsigned int attempt = 0; + + /* Don't start this download until the specified time point + has been reached. */ + std::chrono::steady_clock::time_point embargo; + struct curl_slist * requestHeaders = 0; std::string encoding; @@ -377,7 +387,9 @@ struct CurlDownloader : public Downloader } } - fail( + attempt++; + + auto exc = code == CURLE_ABORTED_BY_CALLBACK && _isInterrupted ? DownloadError(Interrupted, fmt("%s of '%s' was interrupted", request.verb(), request.uri)) : httpStatus != 0 @@ -388,15 +400,31 @@ struct CurlDownloader : public Downloader ) : DownloadError(err, fmt("unable to %s '%s': %s (%d)", - request.verb(), request.uri, curl_easy_strerror(code), code))); + request.verb(), request.uri, curl_easy_strerror(code), code)); + + /* If this is a transient error, then maybe retry the + download after a while. */ + if (err == Transient && attempt < request.tries) { + int ms = request.baseRetryTimeMs * std::pow(2.0f, attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(downloader.mt19937)); + printError(format("warning: %s; retrying in %d ms") % exc.what() % ms); + embargo = std::chrono::steady_clock::now() + std::chrono::milliseconds(ms); + downloader.enqueueItem(shared_from_this()); + } + else + fail(exc); } } }; struct State { + struct EmbargoComparator { + bool operator() (const std::shared_ptr & i1, const std::shared_ptr & i2) { + return i1->embargo > i2->embargo; + } + }; bool quit = false; - std::vector> incoming; + std::priority_queue, std::vector>, EmbargoComparator> incoming; }; Sync state_; @@ -409,6 +437,7 @@ struct CurlDownloader : public Downloader std::thread workerThread; CurlDownloader() + : mt19937(rd()) { static std::once_flag globalInit; std::call_once(globalInit, curl_global_init, CURL_GLOBAL_ALL); @@ -502,7 +531,9 @@ struct CurlDownloader : public Downloader nextWakeup = std::chrono::steady_clock::time_point(); - /* Add new curl requests from the incoming requests queue. */ + /* Add new curl requests from the incoming requests queue, + except for requests that are embargoed (waiting for a + retry timeout to expire). */ if (extraFDs[0].revents & CURL_WAIT_POLLIN) { char buf[1024]; auto res = read(extraFDs[0].fd, buf, sizeof(buf)); @@ -511,9 +542,22 @@ struct CurlDownloader : public Downloader } std::vector> incoming; + auto now = std::chrono::steady_clock::now(); + { auto state(state_.lock()); - std::swap(state->incoming, incoming); + while (!state->incoming.empty()) { + auto item = state->incoming.top(); + if (item->embargo <= now) { + incoming.push_back(item); + state->incoming.pop(); + } else { + if (nextWakeup == std::chrono::steady_clock::time_point() + || item->embargo < nextWakeup) + nextWakeup = item->embargo; + break; + } + } quit = state->quit; } @@ -540,7 +584,7 @@ struct CurlDownloader : public Downloader { auto state(state_.lock()); - state->incoming.clear(); + while (!state->incoming.empty()) state->incoming.pop(); state->quit = true; } } @@ -556,7 +600,7 @@ struct CurlDownloader : public Downloader auto state(state_.lock()); if (state->quit) throw nix::Error("cannot enqueue download request because the download thread is shutting down"); - state->incoming.push_back(item); + state->incoming.push(item); } writeFull(wakeupPipe.writeSide.get(), " "); } @@ -639,9 +683,7 @@ std::future Downloader::enqueueDownload(const DownloadRequest & DownloadResult Downloader::download(const DownloadRequest & request) { - return retry(request.tries, [&]() { - return enqueueDownload(request).get(); - }); + return enqueueDownload(request).get(); } void Downloader::download(DownloadRequest && request, Sink & sink) @@ -827,7 +869,7 @@ CachedDownloadResult Downloader::downloadCached( writeFile(dataFile, url + "\n" + res.etag + "\n" + std::to_string(time(0)) + "\n"); } catch (DownloadError & e) { if (storePath.empty()) throw; - warn("%s; using cached result", e.msg()); + printError(format("warning: %1%; using cached result") % e.msg()); result.etag = expectedETag; } } @@ -878,4 +920,5 @@ bool isUri(const string & s) return scheme == "http" || scheme == "https" || scheme == "file" || scheme == "channel" || scheme == "git" || scheme == "s3" || scheme == "ssh"; } + } diff --git a/src/libstore/download.hh b/src/libstore/download.hh index 9e965b506..3b7fff3ba 100644 --- a/src/libstore/download.hh +++ b/src/libstore/download.hh @@ -96,13 +96,11 @@ struct Downloader std::future enqueueDownload(const DownloadRequest & request); - /* Synchronously download a file. The request will be retried in - case of transient failures. */ + /* Synchronously download a file. */ DownloadResult download(const DownloadRequest & request); /* Download a file, writing its data to a sink. The sink will be - invoked on the thread of the caller. The request will not be - retried in case of transient failures. */ + invoked on the thread of the caller. */ void download(DownloadRequest && request, Sink & sink); /* Check if the specified file is already in ~/.cache/nix/tarballs @@ -128,11 +126,6 @@ public: DownloadError(Downloader::Error error, const FormatOrString & fs) : Error(fs), error(error) { } - - bool isTransient() override - { - return error == Downloader::Error::Transient; - } }; bool isUri(const string & s); diff --git a/src/libstore/http-binary-cache-store.cc b/src/libstore/http-binary-cache-store.cc index 5633b4355..3d17a8966 100644 --- a/src/libstore/http-binary-cache-store.cc +++ b/src/libstore/http-binary-cache-store.cc @@ -2,7 +2,6 @@ #include "download.hh" #include "globals.hh" #include "nar-info-disk-cache.hh" -#include "retry.hh" namespace nix { @@ -114,6 +113,7 @@ protected: DownloadRequest makeRequest(const std::string & path) { DownloadRequest request(cacheUri + "/" + path); + request.tries = 8; return request; } @@ -136,46 +136,21 @@ protected: { checkEnabled(); - struct State - { - DownloadRequest request; - std::function tryDownload; - unsigned int attempt = 0; - State(DownloadRequest && request) : request(request) {} - }; + auto request(makeRequest(path)); - auto state = std::make_shared(makeRequest(path)); - - state->tryDownload = [callback, state, this]() { - getDownloader()->enqueueDownload(state->request, - {[callback, state, this](std::future result) { - try { - callback(result.get().data); - } catch (DownloadError & e) { - if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden) - return callback(std::shared_ptr()); - ++state->attempt; - if (state->attempt < state->request.tries && e.isTransient()) { - auto ms = retrySleepTime(state->attempt); - warn("%s; retrying in %d ms", e.what(), ms); - /* We can't sleep here because that would - block the download thread. So use a - separate thread for sleeping. */ - std::thread([state, ms]() { - std::this_thread::sleep_for(std::chrono::milliseconds(ms)); - state->tryDownload(); - }).detach(); - } else { - maybeDisable(); - callback.rethrow(); - } - } catch (...) { - callback.rethrow(); - } - }}); - }; - - state->tryDownload(); + getDownloader()->enqueueDownload(request, + {[callback, this](std::future result) { + try { + callback(result.get().data); + } catch (DownloadError & e) { + if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden) + return callback(std::shared_ptr()); + maybeDisable(); + callback.rethrow(); + } catch (...) { + callback.rethrow(); + } + }}); } }; diff --git a/src/libstore/store-api.cc b/src/libstore/store-api.cc index 92f01fd2e..f5608d384 100644 --- a/src/libstore/store-api.cc +++ b/src/libstore/store-api.cc @@ -6,11 +6,10 @@ #include "thread-pool.hh" #include "json.hh" #include "derivations.hh" -#include "retry.hh" -#include "download.hh" #include + namespace nix { @@ -580,57 +579,54 @@ void Store::buildPaths(const PathSet & paths, BuildMode buildMode) void copyStorePath(ref srcStore, ref dstStore, const Path & storePath, RepairFlag repair, CheckSigsFlag checkSigs) { - retry(downloadSettings.tries, [&]() { + auto srcUri = srcStore->getUri(); + auto dstUri = dstStore->getUri(); - auto srcUri = srcStore->getUri(); - auto dstUri = dstStore->getUri(); + Activity act(*logger, lvlInfo, actCopyPath, + srcUri == "local" || srcUri == "daemon" + ? fmt("copying path '%s' to '%s'", storePath, dstUri) + : dstUri == "local" || dstUri == "daemon" + ? fmt("copying path '%s' from '%s'", storePath, srcUri) + : fmt("copying path '%s' from '%s' to '%s'", storePath, srcUri, dstUri), + {storePath, srcUri, dstUri}); + PushActivity pact(act.id); - Activity act(*logger, lvlInfo, actCopyPath, - srcUri == "local" || srcUri == "daemon" - ? fmt("copying path '%s' to '%s'", storePath, dstUri) - : dstUri == "local" || dstUri == "daemon" - ? fmt("copying path '%s' from '%s'", storePath, srcUri) - : fmt("copying path '%s' from '%s' to '%s'", storePath, srcUri, dstUri), - {storePath, srcUri, dstUri}); - PushActivity pact(act.id); + auto info = srcStore->queryPathInfo(storePath); - auto info = srcStore->queryPathInfo(storePath); + uint64_t total = 0; - uint64_t total = 0; + if (!info->narHash) { + StringSink sink; + srcStore->narFromPath({storePath}, sink); + auto info2 = make_ref(*info); + info2->narHash = hashString(htSHA256, *sink.s); + if (!info->narSize) info2->narSize = sink.s->size(); + if (info->ultimate) info2->ultimate = false; + info = info2; - if (!info->narHash) { - StringSink sink; - srcStore->narFromPath({storePath}, sink); - auto info2 = make_ref(*info); - info2->narHash = hashString(htSHA256, *sink.s); - if (!info->narSize) info2->narSize = sink.s->size(); - if (info->ultimate) info2->ultimate = false; - info = info2; + StringSource source(*sink.s); + dstStore->addToStore(*info, source, repair, checkSigs); + return; + } - StringSource source(*sink.s); - dstStore->addToStore(*info, source, repair, checkSigs); - return; - } + if (info->ultimate) { + auto info2 = make_ref(*info); + info2->ultimate = false; + info = info2; + } - if (info->ultimate) { - auto info2 = make_ref(*info); - info2->ultimate = false; - info = info2; - } - - auto source = sinkToSource([&](Sink & sink) { - LambdaSink wrapperSink([&](const unsigned char * data, size_t len) { - sink(data, len); - total += len; - act.progress(total, info->narSize); - }); - srcStore->narFromPath({storePath}, wrapperSink); - }, [&]() { - throw EndOfFile("NAR for '%s' fetched from '%s' is incomplete", storePath, srcStore->getUri()); + auto source = sinkToSource([&](Sink & sink) { + LambdaSink wrapperSink([&](const unsigned char * data, size_t len) { + sink(data, len); + total += len; + act.progress(total, info->narSize); }); - - dstStore->addToStore(*info, *source, repair, checkSigs); + srcStore->narFromPath({storePath}, wrapperSink); + }, [&]() { + throw EndOfFile("NAR for '%s' fetched from '%s' is incomplete", storePath, srcStore->getUri()); }); + + dstStore->addToStore(*info, *source, repair, checkSigs); } diff --git a/src/libutil/retry.hh b/src/libutil/retry.hh deleted file mode 100644 index b45cb37f7..000000000 --- a/src/libutil/retry.hh +++ /dev/null @@ -1,38 +0,0 @@ -#pragma once - -#include "logging.hh" - -#include -#include -#include -#include - -namespace nix { - -inline unsigned int retrySleepTime(unsigned int attempt) -{ - std::random_device rd; - std::mt19937 mt19937; - return 250.0 * std::pow(2.0f, - attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(mt19937)); -} - -template -C retry(unsigned int attempts, std::function && f) -{ - unsigned int attempt = 0; - while (true) { - try { - return f(); - } catch (BaseError & e) { - ++attempt; - if (attempt >= attempts || !e.isTransient()) - throw; - auto ms = retrySleepTime(attempt); - warn("%s; retrying in %d ms", e.what(), ms); - std::this_thread::sleep_for(std::chrono::milliseconds(ms)); - } - } -} - -} diff --git a/src/libutil/types.hh b/src/libutil/types.hh index 88e3243f4..92bf469b5 100644 --- a/src/libutil/types.hh +++ b/src/libutil/types.hh @@ -109,8 +109,6 @@ public: const string & msg() const { return err; } const string & prefix() const { return prefix_; } BaseError & addPrefix(const FormatOrString & fs); - - virtual bool isTransient() { return false; } }; #define MakeError(newClass, superClass) \ From f76b2a7fdd7dabad7891400248f5cc778182ebf3 Mon Sep 17 00:00:00 2001 From: Eelco Dolstra Date: Wed, 10 Jul 2019 22:27:50 +0200 Subject: [PATCH 3/5] Downloader: Use warn() --- src/libstore/download.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/libstore/download.cc b/src/libstore/download.cc index 2267b5d35..195eccc2a 100644 --- a/src/libstore/download.cc +++ b/src/libstore/download.cc @@ -406,7 +406,7 @@ struct CurlDownloader : public Downloader download after a while. */ if (err == Transient && attempt < request.tries) { int ms = request.baseRetryTimeMs * std::pow(2.0f, attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(downloader.mt19937)); - printError(format("warning: %s; retrying in %d ms") % exc.what() % ms); + warn("%s; retrying in %d ms", exc.what(), ms); embargo = std::chrono::steady_clock::now() + std::chrono::milliseconds(ms); downloader.enqueueItem(shared_from_this()); } @@ -579,7 +579,7 @@ struct CurlDownloader : public Downloader workerThreadMain(); } catch (nix::Interrupted & e) { } catch (std::exception & e) { - printError(format("unexpected error in download thread: %s") % e.what()); + printError("unexpected error in download thread: %s", e.what()); } { @@ -869,7 +869,7 @@ CachedDownloadResult Downloader::downloadCached( writeFile(dataFile, url + "\n" + res.etag + "\n" + std::to_string(time(0)) + "\n"); } catch (DownloadError & e) { if (storePath.empty()) throw; - printError(format("warning: %1%; using cached result") % e.msg()); + warn("warning: %s; using cached result", e.msg()); result.etag = expectedETag; } } From 00f6fafad61db4537268e1ffa636fd0d2ae86059 Mon Sep 17 00:00:00 2001 From: Eelco Dolstra Date: Wed, 10 Jul 2019 23:05:04 +0200 Subject: [PATCH 4/5] HttpBinaryCacheStore: Use default number of retries for NARs --- src/libstore/http-binary-cache-store.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/src/libstore/http-binary-cache-store.cc b/src/libstore/http-binary-cache-store.cc index 3d17a8966..df2fb9332 100644 --- a/src/libstore/http-binary-cache-store.cc +++ b/src/libstore/http-binary-cache-store.cc @@ -113,7 +113,6 @@ protected: DownloadRequest makeRequest(const std::string & path) { DownloadRequest request(cacheUri + "/" + path); - request.tries = 8; return request; } From 53247d6b116905e7233b1efd6c14845e20d27442 Mon Sep 17 00:00:00 2001 From: Eelco Dolstra Date: Wed, 10 Jul 2019 23:12:17 +0200 Subject: [PATCH 5/5] Resume NAR downloads This is a much simpler fix to the 'error 9 while decompressing xz file' problem than 78fa47a7f08a4cb6ee7061bf0bd86a40e1d6dc91. We just do a ranged HTTP request starting after the data that we previously wrote into the sink. Fixes #2952, #379. --- src/libstore/download.cc | 35 ++++++++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/src/libstore/download.cc b/src/libstore/download.cc index 195eccc2a..91087eebc 100644 --- a/src/libstore/download.cc +++ b/src/libstore/download.cc @@ -71,6 +71,10 @@ struct CurlDownloader : public Downloader std::string encoding; + bool acceptRanges = false; + + curl_off_t writtenToSink = 0; + DownloadItem(CurlDownloader & downloader, const DownloadRequest & request, Callback callback) @@ -81,9 +85,10 @@ struct CurlDownloader : public Downloader {request.uri}, request.parentAct) , callback(callback) , finalSink([this](const unsigned char * data, size_t len) { - if (this->request.dataCallback) + if (this->request.dataCallback) { + writtenToSink += len; this->request.dataCallback((char *) data, len); - else + } else this->result.data->append((char *) data, len); }) { @@ -161,6 +166,7 @@ struct CurlDownloader : public Downloader status = ss.size() >= 2 ? ss[1] : ""; result.data = std::make_shared(); result.bodySize = 0; + acceptRanges = false; encoding = ""; } else { auto i = line.find(':'); @@ -178,7 +184,9 @@ struct CurlDownloader : public Downloader return 0; } } else if (name == "content-encoding") - encoding = trim(string(line, i + 1));; + encoding = trim(string(line, i + 1)); + else if (name == "accept-ranges" && toLower(trim(std::string(line, i + 1))) == "bytes") + acceptRanges = true; } } return realSize; @@ -296,6 +304,9 @@ struct CurlDownloader : public Downloader curl_easy_setopt(req, CURLOPT_NETRC_FILE, settings.netrcFile.get().c_str()); curl_easy_setopt(req, CURLOPT_NETRC, CURL_NETRC_OPTIONAL); + if (writtenToSink) + curl_easy_setopt(req, CURLOPT_RESUME_FROM_LARGE, writtenToSink); + result.data = std::make_shared(); result.bodySize = 0; } @@ -330,7 +341,7 @@ struct CurlDownloader : public Downloader failEx(writeException); else if (code == CURLE_OK && - (httpStatus == 200 || httpStatus == 201 || httpStatus == 204 || httpStatus == 304 || httpStatus == 226 /* FTP */ || httpStatus == 0 /* other protocol */)) + (httpStatus == 200 || httpStatus == 201 || httpStatus == 204 || httpStatus == 206 || httpStatus == 304 || httpStatus == 226 /* FTP */ || httpStatus == 0 /* other protocol */)) { result.cached = httpStatus == 304; done = true; @@ -403,10 +414,20 @@ struct CurlDownloader : public Downloader request.verb(), request.uri, curl_easy_strerror(code), code)); /* If this is a transient error, then maybe retry the - download after a while. */ - if (err == Transient && attempt < request.tries) { + download after a while. If we're writing to a + sink, we can only retry if the server supports + ranged requests. */ + if (err == Transient + && attempt < request.tries + && (!this->request.dataCallback + || writtenToSink == 0 + || (acceptRanges && encoding.empty()))) + { int ms = request.baseRetryTimeMs * std::pow(2.0f, attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(downloader.mt19937)); - warn("%s; retrying in %d ms", exc.what(), ms); + if (writtenToSink) + warn("%s; retrying from offset %d in %d ms", exc.what(), writtenToSink, ms); + else + warn("%s; retrying in %d ms", exc.what(), ms); embargo = std::chrono::steady_clock::now() + std::chrono::milliseconds(ms); downloader.enqueueItem(shared_from_this()); }