From cd391206e6d29608bc3e25301b9921f1ac189086 Mon Sep 17 00:00:00 2001 From: Nikola Knezevic Date: Mon, 6 Apr 2020 22:57:09 +0200 Subject: [PATCH] {get,make,new}Downloader -> DataTransfer --- src/libstore/builtins/fetchurl.cc | 6 ++-- src/libstore/datatransfer.cc | 36 ++++++++++++------------ src/libstore/datatransfer.hh | 18 ++++++------ src/libstore/http-binary-cache-store.cc | 14 ++++----- src/nix-prefetch-url/nix-prefetch-url.cc | 2 +- src/nix/upgrade-nix.cc | 2 +- 6 files changed, 39 insertions(+), 39 deletions(-) diff --git a/src/libstore/builtins/fetchurl.cc b/src/libstore/builtins/fetchurl.cc index 01d21fa98..0b6db8f0c 100644 --- a/src/libstore/builtins/fetchurl.cc +++ b/src/libstore/builtins/fetchurl.cc @@ -26,9 +26,9 @@ void builtinFetchurl(const BasicDerivation & drv, const std::string & netrcData) auto mainUrl = getAttr("url"); bool unpack = get(drv.env, "unpack").value_or("") == "1"; - /* Note: have to use a fresh downloader here because we're in + /* Note: have to use a fresh dataTransfer here because we're in a forked process. */ - auto downloader = makeDownloader(); + auto dataTransfer = makeDataTransfer(); auto fetch = [&](const std::string & url) { @@ -42,7 +42,7 @@ void builtinFetchurl(const BasicDerivation & drv, const std::string & netrcData) auto decompressor = makeDecompressionSink( unpack && hasSuffix(mainUrl, ".xz") ? "xz" : "none", sink); - downloader->download(std::move(request), *decompressor); + dataTransfer->download(std::move(request), *decompressor); decompressor->finish(); }); diff --git a/src/libstore/datatransfer.cc b/src/libstore/datatransfer.cc index 9475c24ec..6ea9f9324 100644 --- a/src/libstore/datatransfer.cc +++ b/src/libstore/datatransfer.cc @@ -39,7 +39,7 @@ std::string resolveUri(const std::string & uri) return uri; } -struct CurlDownloader : public Downloader +struct curlDataTransfer : public DataTransfer { CURLM * curlm = 0; @@ -48,7 +48,7 @@ struct CurlDownloader : public Downloader struct DownloadItem : public std::enable_shared_from_this { - CurlDownloader & downloader; + curlDataTransfer & dataTransfer; DataTransferRequest request; DataTransferResult result; Activity act; @@ -72,10 +72,10 @@ struct CurlDownloader : public Downloader curl_off_t writtenToSink = 0; - DownloadItem(CurlDownloader & downloader, + DownloadItem(curlDataTransfer & dataTransfer, const DataTransferRequest & request, Callback && callback) - : downloader(downloader) + : dataTransfer(dataTransfer) , request(request) , act(*logger, lvlTalkative, actDownload, fmt(request.data ? "uploading '%s'" : "downloading '%s'", request.uri), @@ -106,7 +106,7 @@ struct CurlDownloader : public Downloader { if (req) { if (active) - curl_multi_remove_handle(downloader.curlm, req); + curl_multi_remove_handle(dataTransfer.curlm, req); curl_easy_cleanup(req); } if (requestHeaders) curl_slist_free_all(requestHeaders); @@ -422,13 +422,13 @@ struct CurlDownloader : public Downloader || writtenToSink == 0 || (acceptRanges && encoding.empty()))) { - int ms = request.baseRetryTimeMs * std::pow(2.0f, attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(downloader.mt19937)); + int ms = request.baseRetryTimeMs * std::pow(2.0f, attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(dataTransfer.mt19937)); if (writtenToSink) warn("%s; retrying from offset %d in %d ms", exc.what(), writtenToSink, ms); else warn("%s; retrying in %d ms", exc.what(), ms); embargo = std::chrono::steady_clock::now() + std::chrono::milliseconds(ms); - downloader.enqueueItem(shared_from_this()); + dataTransfer.enqueueItem(shared_from_this()); } else fail(exc); @@ -456,7 +456,7 @@ struct CurlDownloader : public Downloader std::thread workerThread; - CurlDownloader() + curlDataTransfer() : mt19937(rd()) { static std::once_flag globalInit; @@ -478,7 +478,7 @@ struct CurlDownloader : public Downloader workerThread = std::thread([&]() { workerThreadEntry(); }); } - ~CurlDownloader() + ~curlDataTransfer() { stopWorkerThread(); @@ -676,18 +676,18 @@ struct CurlDownloader : public Downloader } }; -ref getDownloader() +ref getDataTransfer() { - static ref downloader = makeDownloader(); - return downloader; + static ref dataTransfer = makeDataTransfer(); + return dataTransfer; } -ref makeDownloader() +ref makeDataTransfer() { - return make_ref(); + return make_ref(); } -std::future Downloader::enqueueDataTransfer(const DataTransferRequest & request) +std::future DataTransfer::enqueueDataTransfer(const DataTransferRequest & request) { auto promise = std::make_shared>(); enqueueDataTransfer(request, @@ -701,15 +701,15 @@ std::future Downloader::enqueueDataTransfer(const DataTransf return promise->get_future(); } -DataTransferResult Downloader::download(const DataTransferRequest & request) +DataTransferResult DataTransfer::download(const DataTransferRequest & request) { return enqueueDataTransfer(request).get(); } -void Downloader::download(DataTransferRequest && request, Sink & sink) +void DataTransfer::download(DataTransferRequest && request, Sink & sink) { /* Note: we can't call 'sink' via request.dataCallback, because - that would cause the sink to execute on the downloader + that would cause the sink to execute on the dataTransfer thread. If 'sink' is a coroutine, this will fail. Also, if the sink is expensive (e.g. one that does decompression and writing to the Nix store), it would stall the download thread too much. diff --git a/src/libstore/datatransfer.hh b/src/libstore/datatransfer.hh index 53159bdb4..e1c5196e0 100644 --- a/src/libstore/datatransfer.hh +++ b/src/libstore/datatransfer.hh @@ -67,11 +67,11 @@ struct DataTransferResult class Store; -struct Downloader +struct DataTransfer { - virtual ~Downloader() { } + virtual ~DataTransfer() { } - /* Enqueue a download request, returning a future to the result of + /* Enqueue a data transfer request, returning a future to the result of the download. The future may throw a DownloadError exception. */ virtual void enqueueDataTransfer(const DataTransferRequest & request, @@ -89,18 +89,18 @@ struct Downloader enum Error { NotFound, Forbidden, Misc, Transient, Interrupted }; }; -/* Return a shared Downloader object. Using this object is preferred +/* Return a shared DataTransfer object. Using this object is preferred because it enables connection reuse and HTTP/2 multiplexing. */ -ref getDownloader(); +ref getDataTransfer(); -/* Return a new Downloader object. */ -ref makeDownloader(); +/* Return a new DataTransfer object. */ +ref makeDataTransfer(); class DownloadError : public Error { public: - Downloader::Error error; - DownloadError(Downloader::Error error, const FormatOrString & fs) + DataTransfer::Error error; + DownloadError(DataTransfer::Error error, const FormatOrString & fs) : Error(fs), error(error) { } }; diff --git a/src/libstore/http-binary-cache-store.cc b/src/libstore/http-binary-cache-store.cc index 861287a33..9a19af5e0 100644 --- a/src/libstore/http-binary-cache-store.cc +++ b/src/libstore/http-binary-cache-store.cc @@ -87,12 +87,12 @@ protected: try { DataTransferRequest request(cacheUri + "/" + path); request.head = true; - getDownloader()->download(request); + getDataTransfer()->download(request); return true; } catch (DownloadError & e) { /* S3 buckets return 403 if a file doesn't exist and the bucket is unlistable, so treat 403 as 404. */ - if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden) + if (e.error == DataTransfer::NotFound || e.error == DataTransfer::Forbidden) return false; maybeDisable(); throw; @@ -107,7 +107,7 @@ protected: req.data = std::make_shared(data); // FIXME: inefficient req.mimeType = mimeType; try { - getDownloader()->download(req); + getDataTransfer()->download(req); } catch (DownloadError & e) { throw UploadToHTTP("while uploading to HTTP binary cache at '%s': %s", cacheUri, e.msg()); } @@ -124,9 +124,9 @@ protected: checkEnabled(); auto request(makeRequest(path)); try { - getDownloader()->download(std::move(request), sink); + getDataTransfer()->download(std::move(request), sink); } catch (DownloadError & e) { - if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden) + if (e.error == DataTransfer::NotFound || e.error == DataTransfer::Forbidden) throw NoSuchBinaryCacheFile("file '%s' does not exist in binary cache '%s'", path, getUri()); maybeDisable(); throw; @@ -142,12 +142,12 @@ protected: auto callbackPtr = std::make_shared(std::move(callback)); - getDownloader()->enqueueDataTransfer(request, + getDataTransfer()->enqueueDataTransfer(request, {[callbackPtr, this](std::future result) { try { (*callbackPtr)(result.get().data); } catch (DownloadError & e) { - if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden) + if (e.error == DataTransfer::NotFound || e.error == DataTransfer::Forbidden) return (*callbackPtr)(std::shared_ptr()); maybeDisable(); callbackPtr->rethrow(); diff --git a/src/nix-prefetch-url/nix-prefetch-url.cc b/src/nix-prefetch-url/nix-prefetch-url.cc index 58a180e72..4c81e7d82 100644 --- a/src/nix-prefetch-url/nix-prefetch-url.cc +++ b/src/nix-prefetch-url/nix-prefetch-url.cc @@ -182,7 +182,7 @@ static int _main(int argc, char * * argv) DataTransferRequest req(actualUri); req.decompress = false; - getDownloader()->download(std::move(req), sink); + getDataTransfer()->download(std::move(req), sink); } /* Optionally unpack the file. */ diff --git a/src/nix/upgrade-nix.cc b/src/nix/upgrade-nix.cc index 414689ec5..a4eda90b4 100644 --- a/src/nix/upgrade-nix.cc +++ b/src/nix/upgrade-nix.cc @@ -139,7 +139,7 @@ struct CmdUpgradeNix : MixDryRun, StoreCommand // FIXME: use nixos.org? auto req = DataTransferRequest(storePathsUrl); - auto res = getDownloader()->download(req); + auto res = getDataTransfer()->download(req); auto state = std::make_unique(Strings(), store); auto v = state->allocValue();