{get,make,new}Downloader -> DataTransfer

This commit is contained in:
Nikola Knezevic 2020-04-06 22:57:09 +02:00
parent 2df2741ec6
commit cd391206e6
6 changed files with 39 additions and 39 deletions

View file

@ -26,9 +26,9 @@ void builtinFetchurl(const BasicDerivation & drv, const std::string & netrcData)
auto mainUrl = getAttr("url"); auto mainUrl = getAttr("url");
bool unpack = get(drv.env, "unpack").value_or("") == "1"; bool unpack = get(drv.env, "unpack").value_or("") == "1";
/* Note: have to use a fresh downloader here because we're in /* Note: have to use a fresh dataTransfer here because we're in
a forked process. */ a forked process. */
auto downloader = makeDownloader(); auto dataTransfer = makeDataTransfer();
auto fetch = [&](const std::string & url) { auto fetch = [&](const std::string & url) {
@ -42,7 +42,7 @@ void builtinFetchurl(const BasicDerivation & drv, const std::string & netrcData)
auto decompressor = makeDecompressionSink( auto decompressor = makeDecompressionSink(
unpack && hasSuffix(mainUrl, ".xz") ? "xz" : "none", sink); unpack && hasSuffix(mainUrl, ".xz") ? "xz" : "none", sink);
downloader->download(std::move(request), *decompressor); dataTransfer->download(std::move(request), *decompressor);
decompressor->finish(); decompressor->finish();
}); });

View file

@ -39,7 +39,7 @@ std::string resolveUri(const std::string & uri)
return uri; return uri;
} }
struct CurlDownloader : public Downloader struct curlDataTransfer : public DataTransfer
{ {
CURLM * curlm = 0; CURLM * curlm = 0;
@ -48,7 +48,7 @@ struct CurlDownloader : public Downloader
struct DownloadItem : public std::enable_shared_from_this<DownloadItem> struct DownloadItem : public std::enable_shared_from_this<DownloadItem>
{ {
CurlDownloader & downloader; curlDataTransfer & dataTransfer;
DataTransferRequest request; DataTransferRequest request;
DataTransferResult result; DataTransferResult result;
Activity act; Activity act;
@ -72,10 +72,10 @@ struct CurlDownloader : public Downloader
curl_off_t writtenToSink = 0; curl_off_t writtenToSink = 0;
DownloadItem(CurlDownloader & downloader, DownloadItem(curlDataTransfer & dataTransfer,
const DataTransferRequest & request, const DataTransferRequest & request,
Callback<DataTransferResult> && callback) Callback<DataTransferResult> && callback)
: downloader(downloader) : dataTransfer(dataTransfer)
, request(request) , request(request)
, act(*logger, lvlTalkative, actDownload, , act(*logger, lvlTalkative, actDownload,
fmt(request.data ? "uploading '%s'" : "downloading '%s'", request.uri), fmt(request.data ? "uploading '%s'" : "downloading '%s'", request.uri),
@ -106,7 +106,7 @@ struct CurlDownloader : public Downloader
{ {
if (req) { if (req) {
if (active) if (active)
curl_multi_remove_handle(downloader.curlm, req); curl_multi_remove_handle(dataTransfer.curlm, req);
curl_easy_cleanup(req); curl_easy_cleanup(req);
} }
if (requestHeaders) curl_slist_free_all(requestHeaders); if (requestHeaders) curl_slist_free_all(requestHeaders);
@ -422,13 +422,13 @@ struct CurlDownloader : public Downloader
|| writtenToSink == 0 || writtenToSink == 0
|| (acceptRanges && encoding.empty()))) || (acceptRanges && encoding.empty())))
{ {
int ms = request.baseRetryTimeMs * std::pow(2.0f, attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(downloader.mt19937)); int ms = request.baseRetryTimeMs * std::pow(2.0f, attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(dataTransfer.mt19937));
if (writtenToSink) if (writtenToSink)
warn("%s; retrying from offset %d in %d ms", exc.what(), writtenToSink, ms); warn("%s; retrying from offset %d in %d ms", exc.what(), writtenToSink, ms);
else else
warn("%s; retrying in %d ms", exc.what(), ms); warn("%s; retrying in %d ms", exc.what(), ms);
embargo = std::chrono::steady_clock::now() + std::chrono::milliseconds(ms); embargo = std::chrono::steady_clock::now() + std::chrono::milliseconds(ms);
downloader.enqueueItem(shared_from_this()); dataTransfer.enqueueItem(shared_from_this());
} }
else else
fail(exc); fail(exc);
@ -456,7 +456,7 @@ struct CurlDownloader : public Downloader
std::thread workerThread; std::thread workerThread;
CurlDownloader() curlDataTransfer()
: mt19937(rd()) : mt19937(rd())
{ {
static std::once_flag globalInit; static std::once_flag globalInit;
@ -478,7 +478,7 @@ struct CurlDownloader : public Downloader
workerThread = std::thread([&]() { workerThreadEntry(); }); workerThread = std::thread([&]() { workerThreadEntry(); });
} }
~CurlDownloader() ~curlDataTransfer()
{ {
stopWorkerThread(); stopWorkerThread();
@ -676,18 +676,18 @@ struct CurlDownloader : public Downloader
} }
}; };
ref<Downloader> getDownloader() ref<DataTransfer> getDataTransfer()
{ {
static ref<Downloader> downloader = makeDownloader(); static ref<DataTransfer> dataTransfer = makeDataTransfer();
return downloader; return dataTransfer;
} }
ref<Downloader> makeDownloader() ref<DataTransfer> makeDataTransfer()
{ {
return make_ref<CurlDownloader>(); return make_ref<curlDataTransfer>();
} }
std::future<DataTransferResult> Downloader::enqueueDataTransfer(const DataTransferRequest & request) std::future<DataTransferResult> DataTransfer::enqueueDataTransfer(const DataTransferRequest & request)
{ {
auto promise = std::make_shared<std::promise<DataTransferResult>>(); auto promise = std::make_shared<std::promise<DataTransferResult>>();
enqueueDataTransfer(request, enqueueDataTransfer(request,
@ -701,15 +701,15 @@ std::future<DataTransferResult> Downloader::enqueueDataTransfer(const DataTransf
return promise->get_future(); return promise->get_future();
} }
DataTransferResult Downloader::download(const DataTransferRequest & request) DataTransferResult DataTransfer::download(const DataTransferRequest & request)
{ {
return enqueueDataTransfer(request).get(); return enqueueDataTransfer(request).get();
} }
void Downloader::download(DataTransferRequest && request, Sink & sink) void DataTransfer::download(DataTransferRequest && request, Sink & sink)
{ {
/* Note: we can't call 'sink' via request.dataCallback, because /* Note: we can't call 'sink' via request.dataCallback, because
that would cause the sink to execute on the downloader that would cause the sink to execute on the dataTransfer
thread. If 'sink' is a coroutine, this will fail. Also, if the thread. If 'sink' is a coroutine, this will fail. Also, if the
sink is expensive (e.g. one that does decompression and writing sink is expensive (e.g. one that does decompression and writing
to the Nix store), it would stall the download thread too much. to the Nix store), it would stall the download thread too much.

View file

@ -67,11 +67,11 @@ struct DataTransferResult
class Store; class Store;
struct Downloader struct DataTransfer
{ {
virtual ~Downloader() { } virtual ~DataTransfer() { }
/* Enqueue a download request, returning a future to the result of /* Enqueue a data transfer request, returning a future to the result of
the download. The future may throw a DownloadError the download. The future may throw a DownloadError
exception. */ exception. */
virtual void enqueueDataTransfer(const DataTransferRequest & request, virtual void enqueueDataTransfer(const DataTransferRequest & request,
@ -89,18 +89,18 @@ struct Downloader
enum Error { NotFound, Forbidden, Misc, Transient, Interrupted }; enum Error { NotFound, Forbidden, Misc, Transient, Interrupted };
}; };
/* Return a shared Downloader object. Using this object is preferred /* Return a shared DataTransfer object. Using this object is preferred
because it enables connection reuse and HTTP/2 multiplexing. */ because it enables connection reuse and HTTP/2 multiplexing. */
ref<Downloader> getDownloader(); ref<DataTransfer> getDataTransfer();
/* Return a new Downloader object. */ /* Return a new DataTransfer object. */
ref<Downloader> makeDownloader(); ref<DataTransfer> makeDataTransfer();
class DownloadError : public Error class DownloadError : public Error
{ {
public: public:
Downloader::Error error; DataTransfer::Error error;
DownloadError(Downloader::Error error, const FormatOrString & fs) DownloadError(DataTransfer::Error error, const FormatOrString & fs)
: Error(fs), error(error) : Error(fs), error(error)
{ } { }
}; };

View file

@ -87,12 +87,12 @@ protected:
try { try {
DataTransferRequest request(cacheUri + "/" + path); DataTransferRequest request(cacheUri + "/" + path);
request.head = true; request.head = true;
getDownloader()->download(request); getDataTransfer()->download(request);
return true; return true;
} catch (DownloadError & e) { } catch (DownloadError & e) {
/* S3 buckets return 403 if a file doesn't exist and the /* S3 buckets return 403 if a file doesn't exist and the
bucket is unlistable, so treat 403 as 404. */ bucket is unlistable, so treat 403 as 404. */
if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden) if (e.error == DataTransfer::NotFound || e.error == DataTransfer::Forbidden)
return false; return false;
maybeDisable(); maybeDisable();
throw; throw;
@ -107,7 +107,7 @@ protected:
req.data = std::make_shared<string>(data); // FIXME: inefficient req.data = std::make_shared<string>(data); // FIXME: inefficient
req.mimeType = mimeType; req.mimeType = mimeType;
try { try {
getDownloader()->download(req); getDataTransfer()->download(req);
} catch (DownloadError & e) { } catch (DownloadError & e) {
throw UploadToHTTP("while uploading to HTTP binary cache at '%s': %s", cacheUri, e.msg()); throw UploadToHTTP("while uploading to HTTP binary cache at '%s': %s", cacheUri, e.msg());
} }
@ -124,9 +124,9 @@ protected:
checkEnabled(); checkEnabled();
auto request(makeRequest(path)); auto request(makeRequest(path));
try { try {
getDownloader()->download(std::move(request), sink); getDataTransfer()->download(std::move(request), sink);
} catch (DownloadError & e) { } catch (DownloadError & e) {
if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden) if (e.error == DataTransfer::NotFound || e.error == DataTransfer::Forbidden)
throw NoSuchBinaryCacheFile("file '%s' does not exist in binary cache '%s'", path, getUri()); throw NoSuchBinaryCacheFile("file '%s' does not exist in binary cache '%s'", path, getUri());
maybeDisable(); maybeDisable();
throw; throw;
@ -142,12 +142,12 @@ protected:
auto callbackPtr = std::make_shared<decltype(callback)>(std::move(callback)); auto callbackPtr = std::make_shared<decltype(callback)>(std::move(callback));
getDownloader()->enqueueDataTransfer(request, getDataTransfer()->enqueueDataTransfer(request,
{[callbackPtr, this](std::future<DataTransferResult> result) { {[callbackPtr, this](std::future<DataTransferResult> result) {
try { try {
(*callbackPtr)(result.get().data); (*callbackPtr)(result.get().data);
} catch (DownloadError & e) { } catch (DownloadError & e) {
if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden) if (e.error == DataTransfer::NotFound || e.error == DataTransfer::Forbidden)
return (*callbackPtr)(std::shared_ptr<std::string>()); return (*callbackPtr)(std::shared_ptr<std::string>());
maybeDisable(); maybeDisable();
callbackPtr->rethrow(); callbackPtr->rethrow();

View file

@ -182,7 +182,7 @@ static int _main(int argc, char * * argv)
DataTransferRequest req(actualUri); DataTransferRequest req(actualUri);
req.decompress = false; req.decompress = false;
getDownloader()->download(std::move(req), sink); getDataTransfer()->download(std::move(req), sink);
} }
/* Optionally unpack the file. */ /* Optionally unpack the file. */

View file

@ -139,7 +139,7 @@ struct CmdUpgradeNix : MixDryRun, StoreCommand
// FIXME: use nixos.org? // FIXME: use nixos.org?
auto req = DataTransferRequest(storePathsUrl); auto req = DataTransferRequest(storePathsUrl);
auto res = getDownloader()->download(req); auto res = getDataTransfer()->download(req);
auto state = std::make_unique<EvalState>(Strings(), store); auto state = std::make_unique<EvalState>(Strings(), store);
auto v = state->allocValue(); auto v = state->allocValue();