Merge pull request #3476 from knl/rename-download-to-filetransfer

Rename download to filetransfer
This commit is contained in:
Domen Kožar 2020-04-09 09:02:35 +02:00 committed by GitHub
commit 74f94d6640
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 150 additions and 143 deletions

View file

@ -1,6 +1,6 @@
#include "common-eval-args.hh" #include "common-eval-args.hh"
#include "shared.hh" #include "shared.hh"
#include "download.hh" #include "filetransfer.hh"
#include "util.hh" #include "util.hh"
#include "eval.hh" #include "eval.hh"
#include "fetchers.hh" #include "fetchers.hh"

View file

@ -5,7 +5,7 @@
#include "derivations.hh" #include "derivations.hh"
#include "globals.hh" #include "globals.hh"
#include "eval-inline.hh" #include "eval-inline.hh"
#include "download.hh" #include "filetransfer.hh"
#include "json.hh" #include "json.hh"
#include "function-trace.hh" #include "function-trace.hh"

View file

@ -544,7 +544,7 @@ formal
#include <unistd.h> #include <unistd.h>
#include "eval.hh" #include "eval.hh"
#include "download.hh" #include "filetransfer.hh"
#include "fetchers.hh" #include "fetchers.hh"
#include "store-api.hh" #include "store-api.hh"
@ -690,7 +690,7 @@ std::pair<bool, std::string> EvalState::resolveSearchPathElem(const SearchPathEl
try { try {
res = { true, store->toRealPath(fetchers::downloadTarball( res = { true, store->toRealPath(fetchers::downloadTarball(
store, resolveUri(elem.second), "source", false).storePath) }; store, resolveUri(elem.second), "source", false).storePath) };
} catch (DownloadError & e) { } catch (FileTransferError & e) {
printError(format("warning: Nix search path entry '%1%' cannot be downloaded, ignoring") % elem.second); printError(format("warning: Nix search path entry '%1%' cannot be downloaded, ignoring") % elem.second);
res = { false, "" }; res = { false, "" };
} }

View file

@ -2,7 +2,7 @@
#include "eval-inline.hh" #include "eval-inline.hh"
#include "store-api.hh" #include "store-api.hh"
#include "fetchers.hh" #include "fetchers.hh"
#include "download.hh" #include "filetransfer.hh"
#include <ctime> #include <ctime>
#include <iomanip> #include <iomanip>

View file

@ -1,4 +1,4 @@
#include "download.hh" #include "filetransfer.hh"
#include "cache.hh" #include "cache.hh"
#include "fetchers.hh" #include "fetchers.hh"
#include "globals.hh" #include "globals.hh"

View file

@ -1,6 +1,6 @@
#include "fetchers.hh" #include "fetchers.hh"
#include "cache.hh" #include "cache.hh"
#include "download.hh" #include "filetransfer.hh"
#include "globals.hh" #include "globals.hh"
#include "store-api.hh" #include "store-api.hh"
#include "archive.hh" #include "archive.hh"
@ -36,13 +36,13 @@ DownloadFileResult downloadFile(
if (cached && !cached->expired) if (cached && !cached->expired)
return useCached(); return useCached();
DownloadRequest request(url); FileTransferRequest request(url);
if (cached) if (cached)
request.expectedETag = getStrAttr(cached->infoAttrs, "etag"); request.expectedETag = getStrAttr(cached->infoAttrs, "etag");
DownloadResult res; FileTransferResult res;
try { try {
res = getDownloader()->download(request); res = getFileTransfer()->download(request);
} catch (DownloadError & e) { } catch (FileTransferError & e) {
if (cached) { if (cached) {
warn("%s; using cached version", e.msg()); warn("%s; using cached version", e.msg());
return useCached(); return useCached();

View file

@ -7,7 +7,7 @@
#include "affinity.hh" #include "affinity.hh"
#include "builtins.hh" #include "builtins.hh"
#include "builtins/buildenv.hh" #include "builtins/buildenv.hh"
#include "download.hh" #include "filetransfer.hh"
#include "finally.hh" #include "finally.hh"
#include "compression.hh" #include "compression.hh"
#include "json.hh" #include "json.hh"
@ -361,7 +361,7 @@ public:
{ {
actDerivations.progress(doneBuilds, expectedBuilds + doneBuilds, runningBuilds, failedBuilds); actDerivations.progress(doneBuilds, expectedBuilds + doneBuilds, runningBuilds, failedBuilds);
actSubstitutions.progress(doneSubstitutions, expectedSubstitutions + doneSubstitutions, runningSubstitutions, failedSubstitutions); actSubstitutions.progress(doneSubstitutions, expectedSubstitutions + doneSubstitutions, runningSubstitutions, failedSubstitutions);
act.setExpected(actDownload, expectedDownloadSize + doneDownloadSize); act.setExpected(actFileTransfer, expectedDownloadSize + doneDownloadSize);
act.setExpected(actCopyPath, expectedNarSize + doneNarSize); act.setExpected(actCopyPath, expectedNarSize + doneNarSize);
} }
}; };

View file

@ -1,5 +1,5 @@
#include "builtins.hh" #include "builtins.hh"
#include "download.hh" #include "filetransfer.hh"
#include "store-api.hh" #include "store-api.hh"
#include "archive.hh" #include "archive.hh"
#include "compression.hh" #include "compression.hh"
@ -26,9 +26,9 @@ void builtinFetchurl(const BasicDerivation & drv, const std::string & netrcData)
auto mainUrl = getAttr("url"); auto mainUrl = getAttr("url");
bool unpack = get(drv.env, "unpack").value_or("") == "1"; bool unpack = get(drv.env, "unpack").value_or("") == "1";
/* Note: have to use a fresh downloader here because we're in /* Note: have to use a fresh fileTransfer here because we're in
a forked process. */ a forked process. */
auto downloader = makeDownloader(); auto fileTransfer = makeFileTransfer();
auto fetch = [&](const std::string & url) { auto fetch = [&](const std::string & url) {
@ -36,13 +36,13 @@ void builtinFetchurl(const BasicDerivation & drv, const std::string & netrcData)
/* No need to do TLS verification, because we check the hash of /* No need to do TLS verification, because we check the hash of
the result anyway. */ the result anyway. */
DownloadRequest request(url); FileTransferRequest request(url);
request.verifyTLS = false; request.verifyTLS = false;
request.decompress = false; request.decompress = false;
auto decompressor = makeDecompressionSink( auto decompressor = makeDecompressionSink(
unpack && hasSuffix(mainUrl, ".xz") ? "xz" : "none", sink); unpack && hasSuffix(mainUrl, ".xz") ? "xz" : "none", sink);
downloader->download(std::move(request), *decompressor); fileTransfer->download(std::move(request), *decompressor);
decompressor->finish(); decompressor->finish();
}); });

View file

@ -1,4 +1,4 @@
#include "download.hh" #include "filetransfer.hh"
#include "util.hh" #include "util.hh"
#include "globals.hh" #include "globals.hh"
#include "store-api.hh" #include "store-api.hh"
@ -27,9 +27,9 @@ using namespace std::string_literals;
namespace nix { namespace nix {
DownloadSettings downloadSettings; FileTransferSettings fileTransferSettings;
static GlobalConfig::Register r1(&downloadSettings); static GlobalConfig::Register r1(&fileTransferSettings);
std::string resolveUri(const std::string & uri) std::string resolveUri(const std::string & uri)
{ {
@ -39,21 +39,21 @@ std::string resolveUri(const std::string & uri)
return uri; return uri;
} }
struct CurlDownloader : public Downloader struct curlFileTransfer : public FileTransfer
{ {
CURLM * curlm = 0; CURLM * curlm = 0;
std::random_device rd; std::random_device rd;
std::mt19937 mt19937; std::mt19937 mt19937;
struct DownloadItem : public std::enable_shared_from_this<DownloadItem> struct TransferItem : public std::enable_shared_from_this<TransferItem>
{ {
CurlDownloader & downloader; curlFileTransfer & fileTransfer;
DownloadRequest request; FileTransferRequest request;
DownloadResult result; FileTransferResult result;
Activity act; Activity act;
bool done = false; // whether either the success or failure function has been called bool done = false; // whether either the success or failure function has been called
Callback<DownloadResult> callback; Callback<FileTransferResult> callback;
CURL * req = 0; CURL * req = 0;
bool active = false; // whether the handle has been added to the multi object bool active = false; // whether the handle has been added to the multi object
std::string status; std::string status;
@ -72,12 +72,12 @@ struct CurlDownloader : public Downloader
curl_off_t writtenToSink = 0; curl_off_t writtenToSink = 0;
DownloadItem(CurlDownloader & downloader, TransferItem(curlFileTransfer & fileTransfer,
const DownloadRequest & request, const FileTransferRequest & request,
Callback<DownloadResult> && callback) Callback<FileTransferResult> && callback)
: downloader(downloader) : fileTransfer(fileTransfer)
, request(request) , request(request)
, act(*logger, lvlTalkative, actDownload, , act(*logger, lvlTalkative, actFileTransfer,
fmt(request.data ? "uploading '%s'" : "downloading '%s'", request.uri), fmt(request.data ? "uploading '%s'" : "downloading '%s'", request.uri),
{request.uri}, request.parentAct) {request.uri}, request.parentAct)
, callback(std::move(callback)) , callback(std::move(callback))
@ -102,17 +102,17 @@ struct CurlDownloader : public Downloader
requestHeaders = curl_slist_append(requestHeaders, ("Content-Type: " + request.mimeType).c_str()); requestHeaders = curl_slist_append(requestHeaders, ("Content-Type: " + request.mimeType).c_str());
} }
~DownloadItem() ~TransferItem()
{ {
if (req) { if (req) {
if (active) if (active)
curl_multi_remove_handle(downloader.curlm, req); curl_multi_remove_handle(fileTransfer.curlm, req);
curl_easy_cleanup(req); curl_easy_cleanup(req);
} }
if (requestHeaders) curl_slist_free_all(requestHeaders); if (requestHeaders) curl_slist_free_all(requestHeaders);
try { try {
if (!done) if (!done)
fail(DownloadError(Interrupted, format("download of '%s' was interrupted") % request.uri)); fail(FileTransferError(Interrupted, format("download of '%s' was interrupted") % request.uri));
} catch (...) { } catch (...) {
ignoreException(); ignoreException();
} }
@ -156,7 +156,7 @@ struct CurlDownloader : public Downloader
static size_t writeCallbackWrapper(void * contents, size_t size, size_t nmemb, void * userp) static size_t writeCallbackWrapper(void * contents, size_t size, size_t nmemb, void * userp)
{ {
return ((DownloadItem *) userp)->writeCallback(contents, size, nmemb); return ((TransferItem *) userp)->writeCallback(contents, size, nmemb);
} }
size_t headerCallback(void * contents, size_t size, size_t nmemb) size_t headerCallback(void * contents, size_t size, size_t nmemb)
@ -198,7 +198,7 @@ struct CurlDownloader : public Downloader
static size_t headerCallbackWrapper(void * contents, size_t size, size_t nmemb, void * userp) static size_t headerCallbackWrapper(void * contents, size_t size, size_t nmemb, void * userp)
{ {
return ((DownloadItem *) userp)->headerCallback(contents, size, nmemb); return ((TransferItem *) userp)->headerCallback(contents, size, nmemb);
} }
int progressCallback(double dltotal, double dlnow) int progressCallback(double dltotal, double dlnow)
@ -213,7 +213,7 @@ struct CurlDownloader : public Downloader
static int progressCallbackWrapper(void * userp, double dltotal, double dlnow, double ultotal, double ulnow) static int progressCallbackWrapper(void * userp, double dltotal, double dlnow, double ultotal, double ulnow)
{ {
return ((DownloadItem *) userp)->progressCallback(dltotal, dlnow); return ((TransferItem *) userp)->progressCallback(dltotal, dlnow);
} }
static int debugCallback(CURL * handle, curl_infotype type, char * data, size_t size, void * userptr) static int debugCallback(CURL * handle, curl_infotype type, char * data, size_t size, void * userptr)
@ -237,7 +237,7 @@ struct CurlDownloader : public Downloader
static size_t readCallbackWrapper(char *buffer, size_t size, size_t nitems, void * userp) static size_t readCallbackWrapper(char *buffer, size_t size, size_t nitems, void * userp)
{ {
return ((DownloadItem *) userp)->readCallback(buffer, size, nitems); return ((TransferItem *) userp)->readCallback(buffer, size, nitems);
} }
void init() void init()
@ -248,7 +248,7 @@ struct CurlDownloader : public Downloader
if (verbosity >= lvlVomit) { if (verbosity >= lvlVomit) {
curl_easy_setopt(req, CURLOPT_VERBOSE, 1); curl_easy_setopt(req, CURLOPT_VERBOSE, 1);
curl_easy_setopt(req, CURLOPT_DEBUGFUNCTION, DownloadItem::debugCallback); curl_easy_setopt(req, CURLOPT_DEBUGFUNCTION, TransferItem::debugCallback);
} }
curl_easy_setopt(req, CURLOPT_URL, request.uri.c_str()); curl_easy_setopt(req, CURLOPT_URL, request.uri.c_str());
@ -257,19 +257,19 @@ struct CurlDownloader : public Downloader
curl_easy_setopt(req, CURLOPT_NOSIGNAL, 1); curl_easy_setopt(req, CURLOPT_NOSIGNAL, 1);
curl_easy_setopt(req, CURLOPT_USERAGENT, curl_easy_setopt(req, CURLOPT_USERAGENT,
("curl/" LIBCURL_VERSION " Nix/" + nixVersion + ("curl/" LIBCURL_VERSION " Nix/" + nixVersion +
(downloadSettings.userAgentSuffix != "" ? " " + downloadSettings.userAgentSuffix.get() : "")).c_str()); (fileTransferSettings.userAgentSuffix != "" ? " " + fileTransferSettings.userAgentSuffix.get() : "")).c_str());
#if LIBCURL_VERSION_NUM >= 0x072b00 #if LIBCURL_VERSION_NUM >= 0x072b00
curl_easy_setopt(req, CURLOPT_PIPEWAIT, 1); curl_easy_setopt(req, CURLOPT_PIPEWAIT, 1);
#endif #endif
#if LIBCURL_VERSION_NUM >= 0x072f00 #if LIBCURL_VERSION_NUM >= 0x072f00
if (downloadSettings.enableHttp2) if (fileTransferSettings.enableHttp2)
curl_easy_setopt(req, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2TLS); curl_easy_setopt(req, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2TLS);
else else
curl_easy_setopt(req, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1); curl_easy_setopt(req, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1);
#endif #endif
curl_easy_setopt(req, CURLOPT_WRITEFUNCTION, DownloadItem::writeCallbackWrapper); curl_easy_setopt(req, CURLOPT_WRITEFUNCTION, TransferItem::writeCallbackWrapper);
curl_easy_setopt(req, CURLOPT_WRITEDATA, this); curl_easy_setopt(req, CURLOPT_WRITEDATA, this);
curl_easy_setopt(req, CURLOPT_HEADERFUNCTION, DownloadItem::headerCallbackWrapper); curl_easy_setopt(req, CURLOPT_HEADERFUNCTION, TransferItem::headerCallbackWrapper);
curl_easy_setopt(req, CURLOPT_HEADERDATA, this); curl_easy_setopt(req, CURLOPT_HEADERDATA, this);
curl_easy_setopt(req, CURLOPT_PROGRESSFUNCTION, progressCallbackWrapper); curl_easy_setopt(req, CURLOPT_PROGRESSFUNCTION, progressCallbackWrapper);
@ -297,10 +297,10 @@ struct CurlDownloader : public Downloader
curl_easy_setopt(req, CURLOPT_SSL_VERIFYHOST, 0); curl_easy_setopt(req, CURLOPT_SSL_VERIFYHOST, 0);
} }
curl_easy_setopt(req, CURLOPT_CONNECTTIMEOUT, downloadSettings.connectTimeout.get()); curl_easy_setopt(req, CURLOPT_CONNECTTIMEOUT, fileTransferSettings.connectTimeout.get());
curl_easy_setopt(req, CURLOPT_LOW_SPEED_LIMIT, 1L); curl_easy_setopt(req, CURLOPT_LOW_SPEED_LIMIT, 1L);
curl_easy_setopt(req, CURLOPT_LOW_SPEED_TIME, downloadSettings.stalledDownloadTimeout.get()); curl_easy_setopt(req, CURLOPT_LOW_SPEED_TIME, fileTransferSettings.stalledDownloadTimeout.get());
/* If no file exist in the specified path, curl continues to work /* If no file exist in the specified path, curl continues to work
anyway as if netrc support was disabled. */ anyway as if netrc support was disabled. */
@ -401,14 +401,14 @@ struct CurlDownloader : public Downloader
auto exc = auto exc =
code == CURLE_ABORTED_BY_CALLBACK && _isInterrupted code == CURLE_ABORTED_BY_CALLBACK && _isInterrupted
? DownloadError(Interrupted, fmt("%s of '%s' was interrupted", request.verb(), request.uri)) ? FileTransferError(Interrupted, fmt("%s of '%s' was interrupted", request.verb(), request.uri))
: httpStatus != 0 : httpStatus != 0
? DownloadError(err, ? FileTransferError(err,
fmt("unable to %s '%s': HTTP error %d", fmt("unable to %s '%s': HTTP error %d",
request.verb(), request.uri, httpStatus) request.verb(), request.uri, httpStatus)
+ (code == CURLE_OK ? "" : fmt(" (curl error: %s)", curl_easy_strerror(code))) + (code == CURLE_OK ? "" : fmt(" (curl error: %s)", curl_easy_strerror(code)))
) )
: DownloadError(err, : FileTransferError(err,
fmt("unable to %s '%s': %s (%d)", fmt("unable to %s '%s': %s (%d)",
request.verb(), request.uri, curl_easy_strerror(code), code)); request.verb(), request.uri, curl_easy_strerror(code), code));
@ -422,13 +422,13 @@ struct CurlDownloader : public Downloader
|| writtenToSink == 0 || writtenToSink == 0
|| (acceptRanges && encoding.empty()))) || (acceptRanges && encoding.empty())))
{ {
int ms = request.baseRetryTimeMs * std::pow(2.0f, attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(downloader.mt19937)); int ms = request.baseRetryTimeMs * std::pow(2.0f, attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(fileTransfer.mt19937));
if (writtenToSink) if (writtenToSink)
warn("%s; retrying from offset %d in %d ms", exc.what(), writtenToSink, ms); warn("%s; retrying from offset %d in %d ms", exc.what(), writtenToSink, ms);
else else
warn("%s; retrying in %d ms", exc.what(), ms); warn("%s; retrying in %d ms", exc.what(), ms);
embargo = std::chrono::steady_clock::now() + std::chrono::milliseconds(ms); embargo = std::chrono::steady_clock::now() + std::chrono::milliseconds(ms);
downloader.enqueueItem(shared_from_this()); fileTransfer.enqueueItem(shared_from_this());
} }
else else
fail(exc); fail(exc);
@ -439,12 +439,12 @@ struct CurlDownloader : public Downloader
struct State struct State
{ {
struct EmbargoComparator { struct EmbargoComparator {
bool operator() (const std::shared_ptr<DownloadItem> & i1, const std::shared_ptr<DownloadItem> & i2) { bool operator() (const std::shared_ptr<TransferItem> & i1, const std::shared_ptr<TransferItem> & i2) {
return i1->embargo > i2->embargo; return i1->embargo > i2->embargo;
} }
}; };
bool quit = false; bool quit = false;
std::priority_queue<std::shared_ptr<DownloadItem>, std::vector<std::shared_ptr<DownloadItem>>, EmbargoComparator> incoming; std::priority_queue<std::shared_ptr<TransferItem>, std::vector<std::shared_ptr<TransferItem>>, EmbargoComparator> incoming;
}; };
Sync<State> state_; Sync<State> state_;
@ -456,7 +456,7 @@ struct CurlDownloader : public Downloader
std::thread workerThread; std::thread workerThread;
CurlDownloader() curlFileTransfer()
: mt19937(rd()) : mt19937(rd())
{ {
static std::once_flag globalInit; static std::once_flag globalInit;
@ -469,7 +469,7 @@ struct CurlDownloader : public Downloader
#endif #endif
#if LIBCURL_VERSION_NUM >= 0x071e00 // Max connections requires >= 7.30.0 #if LIBCURL_VERSION_NUM >= 0x071e00 // Max connections requires >= 7.30.0
curl_multi_setopt(curlm, CURLMOPT_MAX_TOTAL_CONNECTIONS, curl_multi_setopt(curlm, CURLMOPT_MAX_TOTAL_CONNECTIONS,
downloadSettings.httpConnections.get()); fileTransferSettings.httpConnections.get());
#endif #endif
wakeupPipe.create(); wakeupPipe.create();
@ -478,7 +478,7 @@ struct CurlDownloader : public Downloader
workerThread = std::thread([&]() { workerThreadEntry(); }); workerThread = std::thread([&]() { workerThreadEntry(); });
} }
~CurlDownloader() ~curlFileTransfer()
{ {
stopWorkerThread(); stopWorkerThread();
@ -504,7 +504,7 @@ struct CurlDownloader : public Downloader
stopWorkerThread(); stopWorkerThread();
}); });
std::map<CURL *, std::shared_ptr<DownloadItem>> items; std::map<CURL *, std::shared_ptr<TransferItem>> items;
bool quit = false; bool quit = false;
@ -561,7 +561,7 @@ struct CurlDownloader : public Downloader
throw SysError("reading curl wakeup socket"); throw SysError("reading curl wakeup socket");
} }
std::vector<std::shared_ptr<DownloadItem>> incoming; std::vector<std::shared_ptr<TransferItem>> incoming;
auto now = std::chrono::steady_clock::now(); auto now = std::chrono::steady_clock::now();
{ {
@ -609,7 +609,7 @@ struct CurlDownloader : public Downloader
} }
} }
void enqueueItem(std::shared_ptr<DownloadItem> item) void enqueueItem(std::shared_ptr<TransferItem> item)
{ {
if (item->request.data if (item->request.data
&& !hasPrefix(item->request.uri, "http://") && !hasPrefix(item->request.uri, "http://")
@ -641,8 +641,8 @@ struct CurlDownloader : public Downloader
} }
#endif #endif
void enqueueDownload(const DownloadRequest & request, void enqueueFileTransfer(const FileTransferRequest & request,
Callback<DownloadResult> callback) override Callback<FileTransferResult> callback) override
{ {
/* Ugly hack to support s3:// URIs. */ /* Ugly hack to support s3:// URIs. */
if (hasPrefix(request.uri, "s3://")) { if (hasPrefix(request.uri, "s3://")) {
@ -660,9 +660,9 @@ struct CurlDownloader : public Downloader
// FIXME: implement ETag // FIXME: implement ETag
auto s3Res = s3Helper.getObject(bucketName, key); auto s3Res = s3Helper.getObject(bucketName, key);
DownloadResult res; FileTransferResult res;
if (!s3Res.data) if (!s3Res.data)
throw DownloadError(NotFound, fmt("S3 object '%s' does not exist", request.uri)); throw FileTransferError(NotFound, fmt("S3 object '%s' does not exist", request.uri));
res.data = s3Res.data; res.data = s3Res.data;
callback(std::move(res)); callback(std::move(res));
#else #else
@ -672,26 +672,26 @@ struct CurlDownloader : public Downloader
return; return;
} }
enqueueItem(std::make_shared<DownloadItem>(*this, request, std::move(callback))); enqueueItem(std::make_shared<TransferItem>(*this, request, std::move(callback)));
} }
}; };
ref<Downloader> getDownloader() ref<FileTransfer> getFileTransfer()
{ {
static ref<Downloader> downloader = makeDownloader(); static ref<FileTransfer> fileTransfer = makeFileTransfer();
return downloader; return fileTransfer;
} }
ref<Downloader> makeDownloader() ref<FileTransfer> makeFileTransfer()
{ {
return make_ref<CurlDownloader>(); return make_ref<curlFileTransfer>();
} }
std::future<DownloadResult> Downloader::enqueueDownload(const DownloadRequest & request) std::future<FileTransferResult> FileTransfer::enqueueFileTransfer(const FileTransferRequest & request)
{ {
auto promise = std::make_shared<std::promise<DownloadResult>>(); auto promise = std::make_shared<std::promise<FileTransferResult>>();
enqueueDownload(request, enqueueFileTransfer(request,
{[promise](std::future<DownloadResult> fut) { {[promise](std::future<FileTransferResult> fut) {
try { try {
promise->set_value(fut.get()); promise->set_value(fut.get());
} catch (...) { } catch (...) {
@ -701,15 +701,21 @@ std::future<DownloadResult> Downloader::enqueueDownload(const DownloadRequest &
return promise->get_future(); return promise->get_future();
} }
DownloadResult Downloader::download(const DownloadRequest & request) FileTransferResult FileTransfer::download(const FileTransferRequest & request)
{ {
return enqueueDownload(request).get(); return enqueueFileTransfer(request).get();
} }
void Downloader::download(DownloadRequest && request, Sink & sink) FileTransferResult FileTransfer::upload(const FileTransferRequest & request)
{
/* Note: this method is the same as download, but helps in readability */
return enqueueFileTransfer(request).get();
}
void FileTransfer::download(FileTransferRequest && request, Sink & sink)
{ {
/* Note: we can't call 'sink' via request.dataCallback, because /* Note: we can't call 'sink' via request.dataCallback, because
that would cause the sink to execute on the downloader that would cause the sink to execute on the fileTransfer
thread. If 'sink' is a coroutine, this will fail. Also, if the thread. If 'sink' is a coroutine, this will fail. Also, if the
sink is expensive (e.g. one that does decompression and writing sink is expensive (e.g. one that does decompression and writing
to the Nix store), it would stall the download thread too much. to the Nix store), it would stall the download thread too much.
@ -755,8 +761,8 @@ void Downloader::download(DownloadRequest && request, Sink & sink)
state->avail.notify_one(); state->avail.notify_one();
}; };
enqueueDownload(request, enqueueFileTransfer(request,
{[_state](std::future<DownloadResult> fut) { {[_state](std::future<FileTransferResult> fut) {
auto state(_state->lock()); auto state(_state->lock());
state->quit = true; state->quit = true;
try { try {
@ -801,7 +807,6 @@ void Downloader::download(DownloadRequest && request, Sink & sink)
} }
} }
bool isUri(const string & s) bool isUri(const string & s)
{ {
if (s.compare(0, 8, "channel:") == 0) return true; if (s.compare(0, 8, "channel:") == 0) return true;

View file

@ -9,7 +9,7 @@
namespace nix { namespace nix {
struct DownloadSettings : Config struct FileTransferSettings : Config
{ {
Setting<bool> enableHttp2{this, true, "http2", Setting<bool> enableHttp2{this, true, "http2",
"Whether to enable HTTP/2 support."}; "Whether to enable HTTP/2 support."};
@ -31,15 +31,15 @@ struct DownloadSettings : Config
"How often Nix will attempt to download a file before giving up."}; "How often Nix will attempt to download a file before giving up."};
}; };
extern DownloadSettings downloadSettings; extern FileTransferSettings fileTransferSettings;
struct DownloadRequest struct FileTransferRequest
{ {
std::string uri; std::string uri;
std::string expectedETag; std::string expectedETag;
bool verifyTLS = true; bool verifyTLS = true;
bool head = false; bool head = false;
size_t tries = downloadSettings.tries; size_t tries = fileTransferSettings.tries;
unsigned int baseRetryTimeMs = 250; unsigned int baseRetryTimeMs = 250;
ActivityId parentAct; ActivityId parentAct;
bool decompress = true; bool decompress = true;
@ -47,7 +47,7 @@ struct DownloadRequest
std::string mimeType; std::string mimeType;
std::function<void(char *, size_t)> dataCallback; std::function<void(char *, size_t)> dataCallback;
DownloadRequest(const std::string & uri) FileTransferRequest(const std::string & uri)
: uri(uri), parentAct(getCurActivity()) { } : uri(uri), parentAct(getCurActivity()) { }
std::string verb() std::string verb()
@ -56,7 +56,7 @@ struct DownloadRequest
} }
}; };
struct DownloadResult struct FileTransferResult
{ {
bool cached = false; bool cached = false;
std::string etag; std::string etag;
@ -67,40 +67,43 @@ struct DownloadResult
class Store; class Store;
struct Downloader struct FileTransfer
{ {
virtual ~Downloader() { } virtual ~FileTransfer() { }
/* Enqueue a download request, returning a future to the result of /* Enqueue a data transfer request, returning a future to the result of
the download. The future may throw a DownloadError the download. The future may throw a FileTransferError
exception. */ exception. */
virtual void enqueueDownload(const DownloadRequest & request, virtual void enqueueFileTransfer(const FileTransferRequest & request,
Callback<DownloadResult> callback) = 0; Callback<FileTransferResult> callback) = 0;
std::future<DownloadResult> enqueueDownload(const DownloadRequest & request); std::future<FileTransferResult> enqueueFileTransfer(const FileTransferRequest & request);
/* Synchronously download a file. */ /* Synchronously download a file. */
DownloadResult download(const DownloadRequest & request); FileTransferResult download(const FileTransferRequest & request);
/* Synchronously upload a file. */
FileTransferResult upload(const FileTransferRequest & request);
/* Download a file, writing its data to a sink. The sink will be /* Download a file, writing its data to a sink. The sink will be
invoked on the thread of the caller. */ invoked on the thread of the caller. */
void download(DownloadRequest && request, Sink & sink); void download(FileTransferRequest && request, Sink & sink);
enum Error { NotFound, Forbidden, Misc, Transient, Interrupted }; enum Error { NotFound, Forbidden, Misc, Transient, Interrupted };
}; };
/* Return a shared Downloader object. Using this object is preferred /* Return a shared FileTransfer object. Using this object is preferred
because it enables connection reuse and HTTP/2 multiplexing. */ because it enables connection reuse and HTTP/2 multiplexing. */
ref<Downloader> getDownloader(); ref<FileTransfer> getFileTransfer();
/* Return a new Downloader object. */ /* Return a new FileTransfer object. */
ref<Downloader> makeDownloader(); ref<FileTransfer> makeFileTransfer();
class DownloadError : public Error class FileTransferError : public Error
{ {
public: public:
Downloader::Error error; FileTransfer::Error error;
DownloadError(Downloader::Error error, const FormatOrString & fs) FileTransferError(FileTransfer::Error error, const FormatOrString & fs)
: Error(fs), error(error) : Error(fs), error(error)
{ } { }
}; };

View file

@ -1,5 +1,5 @@
#include "binary-cache-store.hh" #include "binary-cache-store.hh"
#include "download.hh" #include "filetransfer.hh"
#include "globals.hh" #include "globals.hh"
#include "nar-info-disk-cache.hh" #include "nar-info-disk-cache.hh"
@ -85,14 +85,14 @@ protected:
checkEnabled(); checkEnabled();
try { try {
DownloadRequest request(cacheUri + "/" + path); FileTransferRequest request(cacheUri + "/" + path);
request.head = true; request.head = true;
getDownloader()->download(request); getFileTransfer()->download(request);
return true; return true;
} catch (DownloadError & e) { } catch (FileTransferError & e) {
/* S3 buckets return 403 if a file doesn't exist and the /* S3 buckets return 403 if a file doesn't exist and the
bucket is unlistable, so treat 403 as 404. */ bucket is unlistable, so treat 403 as 404. */
if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden) if (e.error == FileTransfer::NotFound || e.error == FileTransfer::Forbidden)
return false; return false;
maybeDisable(); maybeDisable();
throw; throw;
@ -103,19 +103,19 @@ protected:
const std::string & data, const std::string & data,
const std::string & mimeType) override const std::string & mimeType) override
{ {
auto req = DownloadRequest(cacheUri + "/" + path); auto req = FileTransferRequest(cacheUri + "/" + path);
req.data = std::make_shared<string>(data); // FIXME: inefficient req.data = std::make_shared<string>(data); // FIXME: inefficient
req.mimeType = mimeType; req.mimeType = mimeType;
try { try {
getDownloader()->download(req); getFileTransfer()->upload(req);
} catch (DownloadError & e) { } catch (FileTransferError & e) {
throw UploadToHTTP("while uploading to HTTP binary cache at '%s': %s", cacheUri, e.msg()); throw UploadToHTTP("while uploading to HTTP binary cache at '%s': %s", cacheUri, e.msg());
} }
} }
DownloadRequest makeRequest(const std::string & path) FileTransferRequest makeRequest(const std::string & path)
{ {
DownloadRequest request(cacheUri + "/" + path); FileTransferRequest request(cacheUri + "/" + path);
return request; return request;
} }
@ -124,9 +124,9 @@ protected:
checkEnabled(); checkEnabled();
auto request(makeRequest(path)); auto request(makeRequest(path));
try { try {
getDownloader()->download(std::move(request), sink); getFileTransfer()->download(std::move(request), sink);
} catch (DownloadError & e) { } catch (FileTransferError & e) {
if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden) if (e.error == FileTransfer::NotFound || e.error == FileTransfer::Forbidden)
throw NoSuchBinaryCacheFile("file '%s' does not exist in binary cache '%s'", path, getUri()); throw NoSuchBinaryCacheFile("file '%s' does not exist in binary cache '%s'", path, getUri());
maybeDisable(); maybeDisable();
throw; throw;
@ -142,12 +142,12 @@ protected:
auto callbackPtr = std::make_shared<decltype(callback)>(std::move(callback)); auto callbackPtr = std::make_shared<decltype(callback)>(std::move(callback));
getDownloader()->enqueueDownload(request, getFileTransfer()->enqueueFileTransfer(request,
{[callbackPtr, this](std::future<DownloadResult> result) { {[callbackPtr, this](std::future<FileTransferResult> result) {
try { try {
(*callbackPtr)(result.get().data); (*callbackPtr)(result.get().data);
} catch (DownloadError & e) { } catch (FileTransferError & e) {
if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden) if (e.error == FileTransfer::NotFound || e.error == FileTransfer::Forbidden)
return (*callbackPtr)(std::shared_ptr<std::string>()); return (*callbackPtr)(std::shared_ptr<std::string>());
maybeDisable(); maybeDisable();
callbackPtr->rethrow(); callbackPtr->rethrow();
@ -174,4 +174,3 @@ static RegisterStoreImplementation regStore([](
}); });
} }

View file

@ -6,7 +6,7 @@
#include "nar-info-disk-cache.hh" #include "nar-info-disk-cache.hh"
#include "globals.hh" #include "globals.hh"
#include "compression.hh" #include "compression.hh"
#include "download.hh" #include "filetransfer.hh"
#include "istringstream_nocopy.hh" #include "istringstream_nocopy.hh"
#include <aws/core/Aws.h> #include <aws/core/Aws.h>
@ -132,7 +132,7 @@ ref<Aws::Client::ClientConfiguration> S3Helper::makeConfig(const string & region
return res; return res;
} }
S3Helper::DownloadResult S3Helper::getObject( S3Helper::FileTransferResult S3Helper::getObject(
const std::string & bucketName, const std::string & key) const std::string & bucketName, const std::string & key)
{ {
debug("fetching 's3://%s/%s'...", bucketName, key); debug("fetching 's3://%s/%s'...", bucketName, key);
@ -146,7 +146,7 @@ S3Helper::DownloadResult S3Helper::getObject(
return Aws::New<std::stringstream>("STRINGSTREAM"); return Aws::New<std::stringstream>("STRINGSTREAM");
}); });
DownloadResult res; FileTransferResult res;
auto now1 = std::chrono::steady_clock::now(); auto now1 = std::chrono::steady_clock::now();

View file

@ -18,13 +18,13 @@ struct S3Helper
ref<Aws::Client::ClientConfiguration> makeConfig(const std::string & region, const std::string & scheme, const std::string & endpoint); ref<Aws::Client::ClientConfiguration> makeConfig(const std::string & region, const std::string & scheme, const std::string & endpoint);
struct DownloadResult struct FileTransferResult
{ {
std::shared_ptr<std::string> data; std::shared_ptr<std::string> data;
unsigned int durationMs; unsigned int durationMs;
}; };
DownloadResult getObject( FileTransferResult getObject(
const std::string & bucketName, const std::string & key); const std::string & bucketName, const std::string & key);
}; };

View file

@ -198,7 +198,7 @@ bool handleJSONLogMessage(const std::string & msg,
if (action == "start") { if (action == "start") {
auto type = (ActivityType) json["type"]; auto type = (ActivityType) json["type"];
if (trusted || type == actDownload) if (trusted || type == actFileTransfer)
activities.emplace(std::piecewise_construct, activities.emplace(std::piecewise_construct,
std::forward_as_tuple(json["id"]), std::forward_as_tuple(json["id"]),
std::forward_as_tuple(*logger, (Verbosity) json["level"], type, std::forward_as_tuple(*logger, (Verbosity) json["level"], type,

View file

@ -17,7 +17,7 @@ typedef enum {
typedef enum { typedef enum {
actUnknown = 0, actUnknown = 0,
actCopyPath = 100, actCopyPath = 100,
actDownload = 101, actFileTransfer = 101,
actRealise = 102, actRealise = 102,
actCopyPaths = 103, actCopyPaths = 103,
actBuilds = 104, actBuilds = 104,

View file

@ -1,6 +1,6 @@
#include "shared.hh" #include "shared.hh"
#include "globals.hh" #include "globals.hh"
#include "download.hh" #include "filetransfer.hh"
#include "store-api.hh" #include "store-api.hh"
#include "../nix/legacy.hh" #include "../nix/legacy.hh"
#include "fetchers.hh" #include "fetchers.hh"
@ -113,7 +113,7 @@ static void update(const StringSet & channelNames)
// Download the channel tarball. // Download the channel tarball.
try { try {
filename = store->toRealPath(fetchers::downloadFile(store, url + "/nixexprs.tar.xz", "nixexprs.tar.xz", false).storePath); filename = store->toRealPath(fetchers::downloadFile(store, url + "/nixexprs.tar.xz", "nixexprs.tar.xz", false).storePath);
} catch (DownloadError & e) { } catch (FileTransferError & e) {
filename = store->toRealPath(fetchers::downloadFile(store, url + "/nixexprs.tar.bz2", "nixexprs.tar.bz2", false).storePath); filename = store->toRealPath(fetchers::downloadFile(store, url + "/nixexprs.tar.bz2", "nixexprs.tar.bz2", false).storePath);
} }
} }

View file

@ -1,6 +1,6 @@
#include "hash.hh" #include "hash.hh"
#include "shared.hh" #include "shared.hh"
#include "download.hh" #include "filetransfer.hh"
#include "store-api.hh" #include "store-api.hh"
#include "eval.hh" #include "eval.hh"
#include "eval-inline.hh" #include "eval-inline.hh"
@ -180,9 +180,9 @@ static int _main(int argc, char * * argv)
FdSink sink(fd.get()); FdSink sink(fd.get());
DownloadRequest req(actualUri); FileTransferRequest req(actualUri);
req.decompress = false; req.decompress = false;
getDownloader()->download(std::move(req), sink); getFileTransfer()->download(std::move(req), sink);
} }
/* Optionally unpack the file. */ /* Optionally unpack the file. */

View file

@ -8,7 +8,7 @@
#include "shared.hh" #include "shared.hh"
#include "store-api.hh" #include "store-api.hh"
#include "progress-bar.hh" #include "progress-bar.hh"
#include "download.hh" #include "filetransfer.hh"
#include "finally.hh" #include "finally.hh"
#include <sys/types.h> #include <sys/types.h>
@ -176,10 +176,10 @@ void mainWrapped(int argc, char * * argv)
settings.useSubstitutes = false; settings.useSubstitutes = false;
if (!settings.tarballTtl.overriden) if (!settings.tarballTtl.overriden)
settings.tarballTtl = std::numeric_limits<unsigned int>::max(); settings.tarballTtl = std::numeric_limits<unsigned int>::max();
if (!downloadSettings.tries.overriden) if (!fileTransferSettings.tries.overriden)
downloadSettings.tries = 0; fileTransferSettings.tries = 0;
if (!downloadSettings.connectTimeout.overriden) if (!fileTransferSettings.connectTimeout.overriden)
downloadSettings.connectTimeout = 1; fileTransferSettings.connectTimeout = 1;
} }
if (args.refresh) if (args.refresh)

View file

@ -190,8 +190,8 @@ public:
i->s = fmt("querying " ANSI_BOLD "%s" ANSI_NORMAL " on %s", name, getS(fields, 1)); i->s = fmt("querying " ANSI_BOLD "%s" ANSI_NORMAL " on %s", name, getS(fields, 1));
} }
if ((type == actDownload && hasAncestor(*state, actCopyPath, parent)) if ((type == actFileTransfer && hasAncestor(*state, actCopyPath, parent))
|| (type == actDownload && hasAncestor(*state, actQueryPathInfo, parent)) || (type == actFileTransfer && hasAncestor(*state, actQueryPathInfo, parent))
|| (type == actCopyPath && hasAncestor(*state, actSubstitute, parent))) || (type == actCopyPath && hasAncestor(*state, actSubstitute, parent)))
i->visible = false; i->visible = false;
@ -416,7 +416,7 @@ public:
if (!s2.empty()) { res += " ("; res += s2; res += ')'; } if (!s2.empty()) { res += " ("; res += s2; res += ')'; }
} }
showActivity(actDownload, "%s MiB DL", "%.1f", MiB); showActivity(actFileTransfer, "%s MiB DL", "%.1f", MiB);
{ {
auto s = renderActivity(actOptimiseStore, "%s paths optimised"); auto s = renderActivity(actOptimiseStore, "%s paths optimised");

View file

@ -1,7 +1,7 @@
#include "command.hh" #include "command.hh"
#include "common-args.hh" #include "common-args.hh"
#include "store-api.hh" #include "store-api.hh"
#include "download.hh" #include "filetransfer.hh"
#include "eval.hh" #include "eval.hh"
#include "attr-path.hh" #include "attr-path.hh"
#include "names.hh" #include "names.hh"
@ -138,8 +138,8 @@ struct CmdUpgradeNix : MixDryRun, StoreCommand
Activity act(*logger, lvlInfo, actUnknown, "querying latest Nix version"); Activity act(*logger, lvlInfo, actUnknown, "querying latest Nix version");
// FIXME: use nixos.org? // FIXME: use nixos.org?
auto req = DownloadRequest(storePathsUrl); auto req = FileTransferRequest(storePathsUrl);
auto res = getDownloader()->download(req); auto res = getFileTransfer()->download(req);
auto state = std::make_unique<EvalState>(Strings(), store); auto state = std::make_unique<EvalState>(Strings(), store);
auto v = state->allocValue(); auto v = state->allocValue();