libstore: remove FileTransferRequest
it's just a uri and some headers now. those can be function arguments
with no loss of clarity. *actual* additional arguments, for example a
TLS context with additional certificates, could be added on a new and
improved FileTransfer class that carries not just a backend reference
but some real, visible context for its transfers. curl not being very
multi-threading-friendly when using multi handles will make sharing a
bit hard anyway once we drop the single global download worker thread
Change-Id: Id2112c95cbd118c6d920488f38d272d7da926460
This commit is contained in:
parent
a839c31e6c
commit
6f18e1ebde
|
@ -209,7 +209,7 @@ DownloadFileResult downloadFile(
|
|||
const std::string & url,
|
||||
const std::string & name,
|
||||
bool locked,
|
||||
const Headers & headers = {});
|
||||
Headers headers = {});
|
||||
|
||||
struct DownloadTarballResult
|
||||
{
|
||||
|
|
|
@ -15,7 +15,7 @@ DownloadFileResult downloadFile(
|
|||
const std::string & url,
|
||||
const std::string & name,
|
||||
bool locked,
|
||||
const Headers & headers)
|
||||
Headers headers)
|
||||
{
|
||||
// FIXME: check store
|
||||
|
||||
|
@ -40,13 +40,11 @@ DownloadFileResult downloadFile(
|
|||
if (cached && !cached->expired)
|
||||
return useCached();
|
||||
|
||||
FileTransferRequest request(url);
|
||||
request.headers = headers;
|
||||
if (cached)
|
||||
request.headers.emplace_back("If-None-Match", getStrAttr(cached->infoAttrs, "etag"));
|
||||
headers.emplace_back("If-None-Match", getStrAttr(cached->infoAttrs, "etag"));
|
||||
FileTransferResult res;
|
||||
try {
|
||||
res = getFileTransfer()->enqueueDownload(request).get();
|
||||
res = getFileTransfer()->enqueueDownload(url, headers).get();
|
||||
} catch (FileTransferError & e) {
|
||||
if (cached) {
|
||||
warn("%s; using cached version", e.msg());
|
||||
|
|
|
@ -36,9 +36,7 @@ void builtinFetchurl(const BasicDerivation & drv, const std::string & netrcData,
|
|||
|
||||
auto fetch = [&](const std::string & url) {
|
||||
|
||||
FileTransferRequest request(url);
|
||||
|
||||
auto raw = fileTransfer->download(std::move(request));
|
||||
auto raw = fileTransfer->download(url);
|
||||
auto decompressor = makeDecompressionSource(
|
||||
unpack && mainUrl.ends_with(".xz") ? "xz" : "none", *raw);
|
||||
|
||||
|
|
|
@ -44,7 +44,7 @@ struct curlFileTransfer : public FileTransfer
|
|||
struct TransferItem : public std::enable_shared_from_this<TransferItem>
|
||||
{
|
||||
curlFileTransfer & fileTransfer;
|
||||
FileTransferRequest request;
|
||||
std::string uri;
|
||||
FileTransferResult result;
|
||||
Activity act;
|
||||
std::optional<std::string> uploadData;
|
||||
|
@ -90,7 +90,8 @@ struct curlFileTransfer : public FileTransfer
|
|||
}
|
||||
|
||||
TransferItem(curlFileTransfer & fileTransfer,
|
||||
const FileTransferRequest & request,
|
||||
const std::string & uri,
|
||||
const Headers & headers,
|
||||
ActivityId parentAct,
|
||||
std::invocable<std::exception_ptr> auto callback,
|
||||
std::function<void(TransferItem &, std::string_view data)> dataCallback,
|
||||
|
@ -98,10 +99,10 @@ struct curlFileTransfer : public FileTransfer
|
|||
bool noBody
|
||||
)
|
||||
: fileTransfer(fileTransfer)
|
||||
, request(request)
|
||||
, uri(uri)
|
||||
, act(*logger, lvlTalkative, actFileTransfer,
|
||||
fmt(uploadData ? "uploading '%s'" : "downloading '%s'", request.uri),
|
||||
{request.uri}, parentAct)
|
||||
fmt(uploadData ? "uploading '%s'" : "downloading '%s'", uri),
|
||||
{uri}, parentAct)
|
||||
, uploadData(std::move(uploadData))
|
||||
, noBody(noBody)
|
||||
, callback([cb{std::move(callback)}] (std::exception_ptr ex, FileTransferResult r) {
|
||||
|
@ -111,7 +112,7 @@ struct curlFileTransfer : public FileTransfer
|
|||
, dataCallback(std::move(dataCallback))
|
||||
{
|
||||
requestHeaders = curl_slist_append(requestHeaders, "Accept-Encoding: zstd, br, gzip, deflate, bzip2, xz");
|
||||
for (auto it = request.headers.begin(); it != request.headers.end(); ++it){
|
||||
for (auto it = headers.begin(); it != headers.end(); ++it){
|
||||
requestHeaders = curl_slist_append(requestHeaders, fmt("%s: %s", it->first, it->second).c_str());
|
||||
}
|
||||
}
|
||||
|
@ -126,7 +127,7 @@ struct curlFileTransfer : public FileTransfer
|
|||
if (requestHeaders) curl_slist_free_all(requestHeaders);
|
||||
try {
|
||||
if (!done)
|
||||
fail(FileTransferError(Interrupted, {}, "download of '%s' was interrupted", request.uri));
|
||||
fail(FileTransferError(Interrupted, {}, "download of '%s' was interrupted", uri));
|
||||
} catch (...) {
|
||||
ignoreExceptionInDestructor();
|
||||
}
|
||||
|
@ -177,7 +178,7 @@ struct curlFileTransfer : public FileTransfer
|
|||
{
|
||||
size_t realSize = size * nmemb;
|
||||
std::string line(static_cast<char *>(contents), realSize);
|
||||
printMsg(lvlVomit, "got header for '%s': %s", request.uri, trim(line));
|
||||
printMsg(lvlVomit, "got header for '%s': %s", uri, trim(line));
|
||||
|
||||
static std::regex statusLine("HTTP/[^ ]+ +[0-9]+(.*)", std::regex::extended | std::regex::icase);
|
||||
if (std::smatch match; std::regex_match(line, match, statusLine)) {
|
||||
|
@ -273,7 +274,7 @@ struct curlFileTransfer : public FileTransfer
|
|||
curl_easy_setopt(req, CURLOPT_DEBUGFUNCTION, TransferItem::debugCallback);
|
||||
}
|
||||
|
||||
curl_easy_setopt(req, CURLOPT_URL, request.uri.c_str());
|
||||
curl_easy_setopt(req, CURLOPT_URL, uri.c_str());
|
||||
curl_easy_setopt(req, CURLOPT_FOLLOWLOCATION, 1L);
|
||||
curl_easy_setopt(req, CURLOPT_MAXREDIRS, 10);
|
||||
curl_easy_setopt(req, CURLOPT_NOSIGNAL, 1);
|
||||
|
@ -341,7 +342,7 @@ struct curlFileTransfer : public FileTransfer
|
|||
result.effectiveUri = effectiveUriCStr;
|
||||
|
||||
debug("finished %s of '%s'; curl status = %d, HTTP status = %d, body = %d bytes",
|
||||
verb(), request.uri, code, httpStatus, bodySize);
|
||||
verb(), uri, code, httpStatus, bodySize);
|
||||
|
||||
// this has to happen here until we can return an actual future.
|
||||
// wrapping user `callback`s instead is not possible because the
|
||||
|
@ -419,17 +420,17 @@ struct curlFileTransfer : public FileTransfer
|
|||
response = std::move(result.data);
|
||||
auto exc =
|
||||
code == CURLE_ABORTED_BY_CALLBACK && _isInterrupted
|
||||
? FileTransferError(Interrupted, std::move(response), "%s of '%s' was interrupted", verb(), request.uri)
|
||||
? FileTransferError(Interrupted, std::move(response), "%s of '%s' was interrupted", verb(), uri)
|
||||
: httpStatus != 0
|
||||
? FileTransferError(err,
|
||||
std::move(response),
|
||||
"unable to %s '%s': HTTP error %d (%s)%s",
|
||||
verb(), request.uri, httpStatus, statusMsg,
|
||||
verb(), uri, httpStatus, statusMsg,
|
||||
code == CURLE_OK ? "" : fmt(" (curl error: %s)", curl_easy_strerror(code)))
|
||||
: FileTransferError(err,
|
||||
std::move(response),
|
||||
"unable to %s '%s': %s (%d)",
|
||||
verb(), request.uri, curl_easy_strerror(code), code);
|
||||
verb(), uri, curl_easy_strerror(code), code);
|
||||
|
||||
/* If this is a transient error, then maybe retry the
|
||||
download after a while. If we're writing to a
|
||||
|
@ -596,7 +597,7 @@ struct curlFileTransfer : public FileTransfer
|
|||
}
|
||||
|
||||
for (auto & item : incoming) {
|
||||
debug("starting %s of %s", item->verb(), item->request.uri);
|
||||
debug("starting %s of %s", item->verb(), item->uri);
|
||||
item->init();
|
||||
curl_multi_add_handle(curlm, item->req);
|
||||
item->active = true;
|
||||
|
@ -626,9 +627,9 @@ struct curlFileTransfer : public FileTransfer
|
|||
std::shared_ptr<TransferItem> enqueueItem(std::shared_ptr<TransferItem> item)
|
||||
{
|
||||
if (item->uploadData
|
||||
&& !item->request.uri.starts_with("http://")
|
||||
&& !item->request.uri.starts_with("https://"))
|
||||
throw nix::Error("uploading to '%s' is not supported", item->request.uri);
|
||||
&& !item->uri.starts_with("http://")
|
||||
&& !item->uri.starts_with("https://"))
|
||||
throw nix::Error("uploading to '%s' is not supported", item->uri);
|
||||
|
||||
{
|
||||
auto state(state_.lock());
|
||||
|
@ -656,23 +657,28 @@ struct curlFileTransfer : public FileTransfer
|
|||
}
|
||||
#endif
|
||||
|
||||
std::future<FileTransferResult> enqueueDownload(const FileTransferRequest & request) override
|
||||
std::future<FileTransferResult>
|
||||
enqueueDownload(const std::string & uri, const Headers & headers = {}) override
|
||||
{
|
||||
return enqueueFileTransfer(request, std::nullopt, false);
|
||||
return enqueueFileTransfer(uri, headers, std::nullopt, false);
|
||||
}
|
||||
|
||||
std::future<FileTransferResult>
|
||||
enqueueUpload(const FileTransferRequest & request, std::string data) override
|
||||
enqueueUpload(const std::string & uri, std::string data, const Headers & headers) override
|
||||
{
|
||||
return enqueueFileTransfer(request, std::move(data), false);
|
||||
return enqueueFileTransfer(uri, headers, std::move(data), false);
|
||||
}
|
||||
|
||||
std::future<FileTransferResult> enqueueFileTransfer(
|
||||
const FileTransferRequest & request, std::optional<std::string> data, bool noBody
|
||||
const std::string & uri,
|
||||
const Headers & headers,
|
||||
std::optional<std::string> data,
|
||||
bool noBody
|
||||
)
|
||||
{
|
||||
return enqueueFileTransfer(
|
||||
request,
|
||||
uri,
|
||||
headers,
|
||||
[](std::exception_ptr ex) {
|
||||
if (ex) {
|
||||
std::rethrow_exception(ex);
|
||||
|
@ -684,7 +690,9 @@ struct curlFileTransfer : public FileTransfer
|
|||
);
|
||||
}
|
||||
|
||||
std::future<FileTransferResult> enqueueFileTransfer(const FileTransferRequest & request,
|
||||
std::future<FileTransferResult> enqueueFileTransfer(
|
||||
const std::string & uri,
|
||||
const Headers & headers,
|
||||
std::invocable<std::exception_ptr> auto callback,
|
||||
std::function<void(TransferItem &, std::string_view data)> dataCallback,
|
||||
std::optional<std::string> data,
|
||||
|
@ -692,9 +700,9 @@ struct curlFileTransfer : public FileTransfer
|
|||
)
|
||||
{
|
||||
/* Ugly hack to support s3:// URIs. */
|
||||
if (request.uri.starts_with("s3://")) {
|
||||
if (uri.starts_with("s3://")) {
|
||||
// FIXME: do this on a worker thread
|
||||
return std::async(std::launch::deferred, [uri{request.uri}]() -> FileTransferResult {
|
||||
return std::async(std::launch::deferred, [uri]() -> FileTransferResult {
|
||||
#if ENABLE_S3
|
||||
auto [bucketName, key, params] = parseS3Uri(uri);
|
||||
|
||||
|
@ -720,7 +728,8 @@ struct curlFileTransfer : public FileTransfer
|
|||
|
||||
return enqueueItem(std::make_shared<TransferItem>(
|
||||
*this,
|
||||
request,
|
||||
uri,
|
||||
headers,
|
||||
getCurActivity(),
|
||||
std::move(callback),
|
||||
std::move(dataCallback),
|
||||
|
@ -730,10 +739,10 @@ struct curlFileTransfer : public FileTransfer
|
|||
->callback.get_future();
|
||||
}
|
||||
|
||||
bool exists(std::string_view uri) override
|
||||
bool exists(const std::string & uri, const Headers & headers) override
|
||||
{
|
||||
try {
|
||||
enqueueFileTransfer(FileTransferRequest{uri}, std::nullopt, true).get();
|
||||
enqueueFileTransfer(uri, headers, std::nullopt, true).get();
|
||||
return true;
|
||||
} catch (FileTransferError & e) {
|
||||
/* S3 buckets return 403 if a file doesn't exist and the
|
||||
|
@ -744,7 +753,7 @@ struct curlFileTransfer : public FileTransfer
|
|||
}
|
||||
}
|
||||
|
||||
box_ptr<Source> download(FileTransferRequest && request) override
|
||||
box_ptr<Source> download(const std::string & uri, const Headers & headers) override
|
||||
{
|
||||
struct State {
|
||||
bool done = false, failed = false;
|
||||
|
@ -756,7 +765,8 @@ struct curlFileTransfer : public FileTransfer
|
|||
auto _state = std::make_shared<Sync<State>>();
|
||||
|
||||
enqueueFileTransfer(
|
||||
request,
|
||||
uri,
|
||||
headers,
|
||||
[_state](std::exception_ptr ex) {
|
||||
auto state(_state->lock());
|
||||
state->done = true;
|
||||
|
|
|
@ -52,15 +52,6 @@ struct FileTransferSettings : Config
|
|||
|
||||
extern FileTransferSettings fileTransferSettings;
|
||||
|
||||
struct FileTransferRequest
|
||||
{
|
||||
std::string uri;
|
||||
Headers headers;
|
||||
|
||||
FileTransferRequest(std::string_view uri)
|
||||
: uri(uri) { }
|
||||
};
|
||||
|
||||
struct FileTransferResult
|
||||
{
|
||||
bool cached = false;
|
||||
|
@ -83,14 +74,14 @@ struct FileTransfer
|
|||
* Enqueues a download request, returning a future for the result of
|
||||
* the download. The future may throw a FileTransferError exception.
|
||||
*/
|
||||
virtual std::future<FileTransferResult> enqueueDownload(const FileTransferRequest & request) = 0;
|
||||
virtual std::future<FileTransferResult>
|
||||
enqueueDownload(const std::string & uri, const Headers & headers = {}) = 0;
|
||||
|
||||
/**
|
||||
* Enqueue an upload request, returning a future for the result of
|
||||
* the upload. The future may throw a FileTransferError exception.
|
||||
* Upload some data. May throw a FileTransferError exception.
|
||||
*/
|
||||
virtual std::future<FileTransferResult>
|
||||
enqueueUpload(const FileTransferRequest & request, std::string data) = 0;
|
||||
enqueueUpload(const std::string & uri, std::string data, const Headers & headers = {}) = 0;
|
||||
|
||||
/**
|
||||
* Checks whether the given URI exists. For historical reasons this function
|
||||
|
@ -102,7 +93,7 @@ struct FileTransfer
|
|||
*
|
||||
* S3 objects are downloaded completely to answer this request.
|
||||
*/
|
||||
virtual bool exists(std::string_view uri) = 0;
|
||||
virtual bool exists(const std::string & uri, const Headers & headers = {}) = 0;
|
||||
|
||||
/**
|
||||
* Download a file, returning its contents through a source. Will not return
|
||||
|
@ -111,7 +102,7 @@ struct FileTransfer
|
|||
* thrown by the returned source. The source will only throw errors detected
|
||||
* during the transfer itself (decompression errors, connection drops, etc).
|
||||
*/
|
||||
virtual box_ptr<Source> download(FileTransferRequest && request) = 0;
|
||||
virtual box_ptr<Source> download(const std::string & uri, const Headers & headers = {}) = 0;
|
||||
|
||||
enum Error { NotFound, Forbidden, Misc, Transient, Interrupted };
|
||||
};
|
||||
|
|
|
@ -125,13 +125,15 @@ protected:
|
|||
std::shared_ptr<std::basic_iostream<char>> istream,
|
||||
const std::string & mimeType) override
|
||||
{
|
||||
FileTransferRequest req{makeURI(path)};
|
||||
auto data = StreamToSourceAdapter(istream).drain();
|
||||
req.headers = {{"Content-Type", mimeType}};
|
||||
try {
|
||||
getFileTransfer()->enqueueUpload(req, std::move(data)).get();
|
||||
getFileTransfer()
|
||||
->enqueueUpload(makeURI(path), std::move(data), {{"Content-Type", mimeType}})
|
||||
.get();
|
||||
} catch (FileTransferError & e) {
|
||||
throw UploadToHTTP("while uploading to HTTP binary cache at '%s': %s", cacheUri, e.msg());
|
||||
throw UploadToHTTP(
|
||||
"while uploading to HTTP binary cache at '%s': %s", cacheUri, e.msg()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -146,9 +148,8 @@ protected:
|
|||
box_ptr<Source> getFile(const std::string & path) override
|
||||
{
|
||||
checkEnabled();
|
||||
FileTransferRequest request{makeURI(path)};
|
||||
try {
|
||||
return getFileTransfer()->download(std::move(request));
|
||||
return getFileTransfer()->download(makeURI(path));
|
||||
} catch (FileTransferError & e) {
|
||||
if (e.error == FileTransfer::NotFound || e.error == FileTransfer::Forbidden)
|
||||
throw NoSuchBinaryCacheFile("file '%s' does not exist in binary cache '%s'", path, getUri());
|
||||
|
@ -161,10 +162,8 @@ protected:
|
|||
{
|
||||
checkEnabled();
|
||||
|
||||
FileTransferRequest request{makeURI(path)};
|
||||
|
||||
try {
|
||||
return std::move(getFileTransfer()->enqueueDownload(request).get().data);
|
||||
return std::move(getFileTransfer()->enqueueDownload(makeURI(path)).get().data);
|
||||
} catch (FileTransferError & e) {
|
||||
if (e.error == FileTransfer::NotFound || e.error == FileTransfer::Forbidden)
|
||||
return {};
|
||||
|
|
|
@ -48,7 +48,7 @@ std::string resolveMirrorUrl(EvalState & state, const std::string & url)
|
|||
|
||||
std::tuple<StorePath, Hash> prefetchFile(
|
||||
ref<Store> store,
|
||||
std::string_view url,
|
||||
const std::string & url,
|
||||
std::optional<std::string> name,
|
||||
HashType hashType,
|
||||
std::optional<Hash> expectedHash,
|
||||
|
@ -98,8 +98,7 @@ std::tuple<StorePath, Hash> prefetchFile(
|
|||
|
||||
FdSink sink(fd.get());
|
||||
|
||||
FileTransferRequest req(url);
|
||||
getFileTransfer()->download(std::move(req))->drainInto(sink);
|
||||
getFileTransfer()->download(url)->drainInto(sink);
|
||||
}
|
||||
|
||||
/* Optionally unpack the file. */
|
||||
|
|
|
@ -285,8 +285,7 @@ struct CmdUpgradeNix : MixDryRun, EvalCommand
|
|||
Activity act(*logger, lvlInfo, actUnknown, "querying latest Nix version");
|
||||
|
||||
// FIXME: use nixos.org?
|
||||
auto req = FileTransferRequest(storePathsUrl);
|
||||
auto res = getFileTransfer()->enqueueDownload(req).get();
|
||||
auto res = getFileTransfer()->enqueueDownload(storePathsUrl).get();
|
||||
|
||||
auto state = std::make_unique<EvalState>(SearchPath{}, store);
|
||||
auto v = state->allocValue();
|
||||
|
|
|
@ -136,7 +136,7 @@ TEST(FileTransfer, exceptionAbortsDownload)
|
|||
|
||||
LambdaSink broken([](auto block) { throw Done(); });
|
||||
|
||||
ASSERT_THROW(ft->download(FileTransferRequest("file:///dev/zero"))->drainInto(broken), Done);
|
||||
ASSERT_THROW(ft->download("file:///dev/zero")->drainInto(broken), Done);
|
||||
|
||||
// makeFileTransfer returns a ref<>, which cannot be cleared. since we also
|
||||
// can't default-construct it we'll have to overwrite it instead, but we'll
|
||||
|
@ -155,7 +155,7 @@ TEST(FileTransfer, exceptionAbortsRead)
|
|||
auto [port, srv] = serveHTTP("200 ok", "content-length: 0\r\n", [] { return ""; });
|
||||
auto ft = makeFileTransfer();
|
||||
char buf[10] = "";
|
||||
ASSERT_THROW(ft->download(FileTransferRequest(fmt("http://[::1]:%d/index", port)))->read(buf, 10), EndOfFile);
|
||||
ASSERT_THROW(ft->download(fmt("http://[::1]:%d/index", port))->read(buf, 10), EndOfFile);
|
||||
}
|
||||
|
||||
TEST(FileTransfer, NOT_ON_DARWIN(reportsSetupErrors))
|
||||
|
@ -163,7 +163,7 @@ TEST(FileTransfer, NOT_ON_DARWIN(reportsSetupErrors))
|
|||
auto [port, srv] = serveHTTP("404 not found", "", [] { return ""; });
|
||||
auto ft = makeFileTransfer();
|
||||
ASSERT_THROW(
|
||||
ft->enqueueDownload(FileTransferRequest(fmt("http://[::1]:%d/index", port))).get(),
|
||||
ft->enqueueDownload(fmt("http://[::1]:%d/index", port)).get(),
|
||||
FileTransferError
|
||||
);
|
||||
}
|
||||
|
@ -179,8 +179,7 @@ TEST(FileTransfer, NOT_ON_DARWIN(defersFailures))
|
|||
return std::string(1024 * 1024, ' ');
|
||||
});
|
||||
auto ft = makeFileTransfer(0);
|
||||
FileTransferRequest req(fmt("http://[::1]:%d/index", port));
|
||||
auto src = ft->download(std::move(req));
|
||||
auto src = ft->download(fmt("http://[::1]:%d/index", port));
|
||||
ASSERT_THROW(src->drain(), FileTransferError);
|
||||
}
|
||||
|
||||
|
@ -193,7 +192,7 @@ TEST(FileTransfer, NOT_ON_DARWIN(handlesContentEncoding))
|
|||
auto ft = makeFileTransfer();
|
||||
|
||||
StringSink sink;
|
||||
ft->download(FileTransferRequest(fmt("http://[::1]:%d/index", port)))->drainInto(sink);
|
||||
ft->download(fmt("http://[::1]:%d/index", port))->drainInto(sink);
|
||||
EXPECT_EQ(sink.s, original);
|
||||
}
|
||||
|
||||
|
@ -216,8 +215,7 @@ TEST(FileTransfer, usesIntermediateLinkHeaders)
|
|||
{"200 ok", "content-length: 1\r\n", [] { return "a"; }},
|
||||
});
|
||||
auto ft = makeFileTransfer(0);
|
||||
FileTransferRequest req(fmt("http://[::1]:%d/first", port));
|
||||
auto result = ft->enqueueDownload(req).get();
|
||||
auto result = ft->enqueueDownload(fmt("http://[::1]:%d/first", port)).get();
|
||||
ASSERT_EQ(result.immutableUrl, "http://foo");
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue