Merge changes I0220cedd,Ide0c0512,I6fcd920e,I85ec62ff,I35853a91, ... into main

* changes:
  libstore: check that transfer headers don't change during retries
  libstore: use effective transfer url for retries
  libstore: collect effective url and cachedness earlier
  libstore: remove TransferItem::active
  libstore: always allocate TransferItem::req
  libstore: remove FileTransferResult::data
  libstore: de-future-ize FileTransfer::enqueueUpload
  libstore: remove FileTransferRequest
  libstore: remove FileTransferRequest::expectedETag
  libstore: remove FileTransferResult::bodySize
  libstore: remove FileTransferRequest::verifyTLS
  libstore: remove FiletransferRequest::head
This commit is contained in:
eldritch horrors 2024-10-28 01:36:45 +00:00 committed by Gerrit Code Review
commit 61146c73ce
9 changed files with 236 additions and 168 deletions

View file

@ -209,7 +209,7 @@ DownloadFileResult downloadFile(
const std::string & url, const std::string & url,
const std::string & name, const std::string & name,
bool locked, bool locked,
const Headers & headers = {}); Headers headers = {});
struct DownloadTarballResult struct DownloadTarballResult
{ {

View file

@ -15,7 +15,7 @@ DownloadFileResult downloadFile(
const std::string & url, const std::string & url,
const std::string & name, const std::string & name,
bool locked, bool locked,
const Headers & headers) Headers headers)
{ {
// FIXME: check store // FIXME: check store
@ -40,13 +40,12 @@ DownloadFileResult downloadFile(
if (cached && !cached->expired) if (cached && !cached->expired)
return useCached(); return useCached();
FileTransferRequest request(url);
request.headers = headers;
if (cached) if (cached)
request.expectedETag = getStrAttr(cached->infoAttrs, "etag"); headers.emplace_back("If-None-Match", getStrAttr(cached->infoAttrs, "etag"));
FileTransferResult res; FileTransferResult res;
std::string data;
try { try {
res = getFileTransfer()->enqueueDownload(request).get(); std::tie(res, data) = getFileTransfer()->enqueueDownload(url, headers).get();
} catch (FileTransferError & e) { } catch (FileTransferError & e) {
if (cached) { if (cached) {
warn("%s; using cached version", e.msg()); warn("%s; using cached version", e.msg());
@ -71,8 +70,8 @@ DownloadFileResult downloadFile(
storePath = std::move(cached->storePath); storePath = std::move(cached->storePath);
} else { } else {
StringSink sink; StringSink sink;
sink << dumpString(res.data); sink << dumpString(data);
auto hash = hashString(HashType::SHA256, res.data); auto hash = hashString(HashType::SHA256, data);
ValidPathInfo info { ValidPathInfo info {
*store, *store,
name, name,

View file

@ -36,9 +36,7 @@ void builtinFetchurl(const BasicDerivation & drv, const std::string & netrcData,
auto fetch = [&](const std::string & url) { auto fetch = [&](const std::string & url) {
FileTransferRequest request(url); auto raw = fileTransfer->download(url);
auto raw = fileTransfer->download(std::move(request));
auto decompressor = makeDecompressionSource( auto decompressor = makeDecompressionSource(
unpack && mainUrl.ends_with(".xz") ? "xz" : "none", *raw); unpack && mainUrl.ends_with(".xz") ? "xz" : "none", *raw);

View file

@ -44,19 +44,33 @@ struct curlFileTransfer : public FileTransfer
struct TransferItem : public std::enable_shared_from_this<TransferItem> struct TransferItem : public std::enable_shared_from_this<TransferItem>
{ {
curlFileTransfer & fileTransfer; curlFileTransfer & fileTransfer;
FileTransferRequest request; std::string uri;
FileTransferResult result; FileTransferResult result;
Activity act; Activity act;
std::optional<std::string> uploadData; std::optional<std::string> uploadData;
bool done = false; // whether either the success or failure function has been called std::string downloadData;
std::packaged_task<FileTransferResult(std::exception_ptr, FileTransferResult)> callback; bool noBody = false; // \equiv HTTP HEAD, don't download data
enum {
/// nothing has been transferred yet
initialSetup,
/// at least some metadata has already been transferred,
/// but the transfer did not succeed and is now retrying
retrySetup,
/// data transfer in progress
transferring,
/// transfer complete, result or failure reported
transferComplete,
} phase = initialSetup;
std::packaged_task<
std::pair<FileTransferResult, std::string>(std::exception_ptr, FileTransferResult)>
callback;
std::function<void(TransferItem &, std::string_view data)> dataCallback; std::function<void(TransferItem &, std::string_view data)> dataCallback;
CURL * req = 0; CURL * req; // must never be nullptr
bool active = false; // whether the handle has been added to the multi object
std::string statusMsg; std::string statusMsg;
unsigned int attempt = 0; unsigned int attempt = 0;
const size_t tries = fileTransferSettings.tries; const size_t tries = fileTransferSettings.tries;
uint64_t bodySize = 0;
/* Don't start this download until the specified time point /* Don't start this download until the specified time point
has been reached. */ has been reached. */
@ -88,43 +102,45 @@ struct curlFileTransfer : public FileTransfer
} }
TransferItem(curlFileTransfer & fileTransfer, TransferItem(curlFileTransfer & fileTransfer,
const FileTransferRequest & request, const std::string & uri,
const Headers & headers,
ActivityId parentAct, ActivityId parentAct,
std::invocable<std::exception_ptr> auto callback, std::invocable<std::exception_ptr> auto callback,
std::function<void(TransferItem &, std::string_view data)> dataCallback, std::function<void(TransferItem &, std::string_view data)> dataCallback,
std::optional<std::string> uploadData std::optional<std::string> uploadData,
bool noBody
) )
: fileTransfer(fileTransfer) : fileTransfer(fileTransfer)
, request(request) , uri(uri)
, act(*logger, lvlTalkative, actFileTransfer, , act(*logger, lvlTalkative, actFileTransfer,
fmt(uploadData ? "uploading '%s'" : "downloading '%s'", request.uri), fmt(uploadData ? "uploading '%s'" : "downloading '%s'", uri),
{request.uri}, parentAct) {uri}, parentAct)
, uploadData(std::move(uploadData)) , uploadData(std::move(uploadData))
, callback([cb{std::move(callback)}] (std::exception_ptr ex, FileTransferResult r) { , noBody(noBody)
, callback([this, cb{std::move(callback)}] (std::exception_ptr ex, FileTransferResult r) {
cb(ex); cb(ex);
return r; return std::pair{std::move(r), std::move(downloadData)};
}) })
, dataCallback(std::move(dataCallback)) , dataCallback(std::move(dataCallback))
, req(curl_easy_init())
{ {
if (req == nullptr) {
throw FileTransferError(Misc, {}, "could not allocate curl handle");
}
requestHeaders = curl_slist_append(requestHeaders, "Accept-Encoding: zstd, br, gzip, deflate, bzip2, xz"); requestHeaders = curl_slist_append(requestHeaders, "Accept-Encoding: zstd, br, gzip, deflate, bzip2, xz");
if (!request.expectedETag.empty()) for (auto it = headers.begin(); it != headers.end(); ++it){
requestHeaders = curl_slist_append(requestHeaders, ("If-None-Match: " + request.expectedETag).c_str());
for (auto it = request.headers.begin(); it != request.headers.end(); ++it){
requestHeaders = curl_slist_append(requestHeaders, fmt("%s: %s", it->first, it->second).c_str()); requestHeaders = curl_slist_append(requestHeaders, fmt("%s: %s", it->first, it->second).c_str());
} }
} }
~TransferItem() ~TransferItem()
{ {
if (req) { curl_multi_remove_handle(fileTransfer.curlm, req);
if (active) curl_easy_cleanup(req);
curl_multi_remove_handle(fileTransfer.curlm, req);
curl_easy_cleanup(req);
}
if (requestHeaders) curl_slist_free_all(requestHeaders); if (requestHeaders) curl_slist_free_all(requestHeaders);
try { try {
if (!done) if (phase != transferComplete)
fail(FileTransferError(Interrupted, {}, "download of '%s' was interrupted", request.uri)); fail(FileTransferError(Interrupted, {}, "download of '%s' was interrupted", uri));
} catch (...) { } catch (...) {
ignoreExceptionInDestructor(); ignoreExceptionInDestructor();
} }
@ -132,8 +148,8 @@ struct curlFileTransfer : public FileTransfer
void failEx(std::exception_ptr ex) void failEx(std::exception_ptr ex)
{ {
assert(!done); assert(phase != transferComplete);
done = true; phase = transferComplete;
callback(ex, std::move(result)); callback(ex, std::move(result));
} }
@ -143,25 +159,55 @@ struct curlFileTransfer : public FileTransfer
failEx(std::make_exception_ptr(std::forward<T>(e))); failEx(std::make_exception_ptr(std::forward<T>(e)));
} }
std::exception_ptr writeException; [[noreturn]]
void throwChangedTarget(std::string_view what, std::string_view from, std::string_view to)
{
throw FileTransferError(
Misc, {}, "uri %s changed %s from %s to %s during transfer", uri, what, from, to
);
}
void maybeFinishSetup()
{
if (phase > retrySetup) {
return;
}
char * effectiveUriCStr = nullptr;
curl_easy_getinfo(req, CURLINFO_EFFECTIVE_URL, &effectiveUriCStr);
if (effectiveUriCStr) {
if (!result.effectiveUri.empty() && result.effectiveUri != effectiveUriCStr) {
throwChangedTarget("final destination", result.effectiveUri, effectiveUriCStr);
}
result.effectiveUri = effectiveUriCStr;
}
result.cached = getHTTPStatus() == 304;
phase = transferring;
}
std::exception_ptr callbackException;
size_t writeCallback(void * contents, size_t size, size_t nmemb) size_t writeCallback(void * contents, size_t size, size_t nmemb)
{ {
const size_t realSize = size * nmemb; const size_t realSize = size * nmemb;
try { try {
result.bodySize += realSize; maybeFinishSetup();
bodySize += realSize;
if (successfulStatuses.count(getHTTPStatus()) && this->dataCallback) { if (successfulStatuses.count(getHTTPStatus()) && this->dataCallback) {
writtenToSink += realSize; writtenToSink += realSize;
dataCallback(*this, {static_cast<const char *>(contents), realSize}); dataCallback(*this, {static_cast<const char *>(contents), realSize});
} else { } else {
this->result.data.append(static_cast<const char *>(contents), realSize); this->downloadData.append(static_cast<const char *>(contents), realSize);
} }
return realSize; return realSize;
} catch (...) { } catch (...) {
writeException = std::current_exception(); callbackException = std::current_exception();
return CURL_WRITEFUNC_ERROR; return CURL_WRITEFUNC_ERROR;
} }
} }
@ -172,30 +218,39 @@ struct curlFileTransfer : public FileTransfer
} }
size_t headerCallback(void * contents, size_t size, size_t nmemb) size_t headerCallback(void * contents, size_t size, size_t nmemb)
{ try {
size_t realSize = size * nmemb; size_t realSize = size * nmemb;
std::string line(static_cast<char *>(contents), realSize); std::string line(static_cast<char *>(contents), realSize);
printMsg(lvlVomit, "got header for '%s': %s", request.uri, trim(line)); printMsg(lvlVomit, "got header for '%s': %s", uri, trim(line));
static std::regex statusLine("HTTP/[^ ]+ +[0-9]+(.*)", std::regex::extended | std::regex::icase); static std::regex statusLine("HTTP/[^ ]+ +[0-9]+(.*)", std::regex::extended | std::regex::icase);
if (std::smatch match; std::regex_match(line, match, statusLine)) { if (std::smatch match; std::regex_match(line, match, statusLine)) {
result.etag = ""; downloadData.clear();
result.data.clear(); bodySize = 0;
result.bodySize = 0;
statusMsg = trim(match.str(1)); statusMsg = trim(match.str(1));
acceptRanges = false; acceptRanges = false;
encoding = "";
} else { } else {
auto i = line.find(':'); auto i = line.find(':');
if (i != std::string::npos) { if (i != std::string::npos) {
std::string name = toLower(trim(line.substr(0, i))); std::string name = toLower(trim(line.substr(0, i)));
if (name == "etag") { if (name == "etag") {
result.etag = trim(line.substr(i + 1)); // NOTE we don't check that the etag hasn't gone *missing*. technically
// this is not an error as long as we get the same data from the remote.
auto etag = trim(line.substr(i + 1));
if (!result.etag.empty() && result.etag != etag) {
throwChangedTarget("ETag", result.etag, etag);
}
result.etag = std::move(etag);
} }
else if (name == "content-encoding") else if (name == "content-encoding") {
encoding = trim(line.substr(i + 1)); auto encoding = trim(line.substr(i + 1));
if (!this->encoding.empty() && this->encoding != encoding) {
throwChangedTarget("encoding", this->encoding, encoding);
}
this->encoding = std::move(encoding);
}
else if (name == "accept-ranges" && toLower(trim(line.substr(i + 1))) == "bytes") else if (name == "accept-ranges" && toLower(trim(line.substr(i + 1))) == "bytes")
acceptRanges = true; acceptRanges = true;
@ -203,14 +258,20 @@ struct curlFileTransfer : public FileTransfer
else if (name == "link" || name == "x-amz-meta-link") { else if (name == "link" || name == "x-amz-meta-link") {
auto value = trim(line.substr(i + 1)); auto value = trim(line.substr(i + 1));
static std::regex linkRegex("<([^>]*)>; rel=\"immutable\"", std::regex::extended | std::regex::icase); static std::regex linkRegex("<([^>]*)>; rel=\"immutable\"", std::regex::extended | std::regex::icase);
if (std::smatch match; std::regex_match(value, match, linkRegex)) if (std::smatch match; std::regex_match(value, match, linkRegex)) {
if (result.immutableUrl && result.immutableUrl != match.str(1)) {
throwChangedTarget("immutable url", *result.immutableUrl, match.str(1));
}
result.immutableUrl = match.str(1); result.immutableUrl = match.str(1);
else } else
debug("got invalid link header '%s'", value); debug("got invalid link header '%s'", value);
} }
} }
} }
return realSize; return realSize;
} catch (...) {
callbackException = std::current_exception();
return CURL_WRITEFUNC_ERROR;
} }
static size_t headerCallbackWrapper(void * contents, size_t size, size_t nmemb, void * userp) static size_t headerCallbackWrapper(void * contents, size_t size, size_t nmemb, void * userp)
@ -262,7 +323,9 @@ struct curlFileTransfer : public FileTransfer
void init() void init()
{ {
if (!req) req = curl_easy_init(); if (phase > initialSetup) {
phase = retrySetup;
}
curl_easy_reset(req); curl_easy_reset(req);
@ -271,7 +334,11 @@ struct curlFileTransfer : public FileTransfer
curl_easy_setopt(req, CURLOPT_DEBUGFUNCTION, TransferItem::debugCallback); curl_easy_setopt(req, CURLOPT_DEBUGFUNCTION, TransferItem::debugCallback);
} }
curl_easy_setopt(req, CURLOPT_URL, request.uri.c_str()); // use the effective URI of the previous transfer for retries. this avoids
// some silent corruption if a redirect changes between starting and retry.
const auto & uri = result.effectiveUri.empty() ? this->uri : result.effectiveUri;
curl_easy_setopt(req, CURLOPT_URL, uri.c_str());
curl_easy_setopt(req, CURLOPT_FOLLOWLOCATION, 1L); curl_easy_setopt(req, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(req, CURLOPT_MAXREDIRS, 10); curl_easy_setopt(req, CURLOPT_MAXREDIRS, 10);
curl_easy_setopt(req, CURLOPT_NOSIGNAL, 1); curl_easy_setopt(req, CURLOPT_NOSIGNAL, 1);
@ -299,7 +366,7 @@ struct curlFileTransfer : public FileTransfer
if (settings.downloadSpeed.get() > 0) if (settings.downloadSpeed.get() > 0)
curl_easy_setopt(req, CURLOPT_MAX_RECV_SPEED_LARGE, (curl_off_t) (settings.downloadSpeed.get() * 1024)); curl_easy_setopt(req, CURLOPT_MAX_RECV_SPEED_LARGE, (curl_off_t) (settings.downloadSpeed.get() * 1024));
if (request.head) if (noBody)
curl_easy_setopt(req, CURLOPT_NOBODY, 1); curl_easy_setopt(req, CURLOPT_NOBODY, 1);
if (uploadData) { if (uploadData) {
@ -309,13 +376,8 @@ struct curlFileTransfer : public FileTransfer
curl_easy_setopt(req, CURLOPT_INFILESIZE_LARGE, (curl_off_t) uploadData->length()); curl_easy_setopt(req, CURLOPT_INFILESIZE_LARGE, (curl_off_t) uploadData->length());
} }
if (request.verifyTLS) { if (settings.caFile != "")
if (settings.caFile != "") curl_easy_setopt(req, CURLOPT_CAINFO, settings.caFile.get().c_str());
curl_easy_setopt(req, CURLOPT_CAINFO, settings.caFile.get().c_str());
} else {
curl_easy_setopt(req, CURLOPT_SSL_VERIFYPEER, 0);
curl_easy_setopt(req, CURLOPT_SSL_VERIFYHOST, 0);
}
curl_easy_setopt(req, CURLOPT_CONNECTTIMEOUT, fileTransferSettings.connectTimeout.get()); curl_easy_setopt(req, CURLOPT_CONNECTTIMEOUT, fileTransferSettings.connectTimeout.get());
@ -330,38 +392,34 @@ struct curlFileTransfer : public FileTransfer
if (writtenToSink) if (writtenToSink)
curl_easy_setopt(req, CURLOPT_RESUME_FROM_LARGE, writtenToSink); curl_easy_setopt(req, CURLOPT_RESUME_FROM_LARGE, writtenToSink);
result.data.clear(); downloadData.clear();
result.bodySize = 0; bodySize = 0;
} }
void finish(CURLcode code) void finish(CURLcode code)
{ {
auto httpStatus = getHTTPStatus(); auto httpStatus = getHTTPStatus();
char * effectiveUriCStr = nullptr; maybeFinishSetup();
curl_easy_getinfo(req, CURLINFO_EFFECTIVE_URL, &effectiveUriCStr);
if (effectiveUriCStr)
result.effectiveUri = effectiveUriCStr;
debug("finished %s of '%s'; curl status = %d, HTTP status = %d, body = %d bytes", debug("finished %s of '%s'; curl status = %d, HTTP status = %d, body = %d bytes",
verb(), request.uri, code, httpStatus, result.bodySize); verb(), uri, code, httpStatus, bodySize);
// this has to happen here until we can return an actual future. // this has to happen here until we can return an actual future.
// wrapping user `callback`s instead is not possible because the // wrapping user `callback`s instead is not possible because the
// Callback api expects std::functions, and copying Callbacks is // Callback api expects std::functions, and copying Callbacks is
// not possible due the promises they hold. // not possible due the promises they hold.
if (code == CURLE_OK && !dataCallback && result.data.length() > 0) { if (code == CURLE_OK && !dataCallback && downloadData.length() > 0) {
result.data = decompress(encoding, result.data); downloadData = decompress(encoding, downloadData);
} }
if (writeException) if (callbackException)
failEx(writeException); failEx(callbackException);
else if (code == CURLE_OK && successfulStatuses.count(httpStatus)) else if (code == CURLE_OK && successfulStatuses.count(httpStatus))
{ {
result.cached = httpStatus == 304; act.progress(bodySize, bodySize);
act.progress(result.bodySize, result.bodySize); phase = transferComplete;
done = true;
callback(nullptr, std::move(result)); callback(nullptr, std::move(result));
} }
@ -419,20 +477,20 @@ struct curlFileTransfer : public FileTransfer
std::optional<std::string> response; std::optional<std::string> response;
if (!successfulStatuses.count(httpStatus)) if (!successfulStatuses.count(httpStatus))
response = std::move(result.data); response = std::move(downloadData);
auto exc = auto exc =
code == CURLE_ABORTED_BY_CALLBACK && _isInterrupted code == CURLE_ABORTED_BY_CALLBACK && _isInterrupted
? FileTransferError(Interrupted, std::move(response), "%s of '%s' was interrupted", verb(), request.uri) ? FileTransferError(Interrupted, std::move(response), "%s of '%s' was interrupted", verb(), uri)
: httpStatus != 0 : httpStatus != 0
? FileTransferError(err, ? FileTransferError(err,
std::move(response), std::move(response),
"unable to %s '%s': HTTP error %d (%s)%s", "unable to %s '%s': HTTP error %d (%s)%s",
verb(), request.uri, httpStatus, statusMsg, verb(), uri, httpStatus, statusMsg,
code == CURLE_OK ? "" : fmt(" (curl error: %s)", curl_easy_strerror(code))) code == CURLE_OK ? "" : fmt(" (curl error: %s)", curl_easy_strerror(code)))
: FileTransferError(err, : FileTransferError(err,
std::move(response), std::move(response),
"unable to %s '%s': %s (%d)", "unable to %s '%s': %s (%d)",
verb(), request.uri, curl_easy_strerror(code), code); verb(), uri, curl_easy_strerror(code), code);
/* If this is a transient error, then maybe retry the /* If this is a transient error, then maybe retry the
download after a while. If we're writing to a download after a while. If we're writing to a
@ -556,7 +614,6 @@ struct curlFileTransfer : public FileTransfer
assert(i != items.end()); assert(i != items.end());
i->second->finish(msg->data.result); i->second->finish(msg->data.result);
curl_multi_remove_handle(curlm, i->second->req); curl_multi_remove_handle(curlm, i->second->req);
i->second->active = false;
items.erase(i); items.erase(i);
} }
} }
@ -599,10 +656,9 @@ struct curlFileTransfer : public FileTransfer
} }
for (auto & item : incoming) { for (auto & item : incoming) {
debug("starting %s of %s", item->verb(), item->request.uri); debug("starting %s of %s", item->verb(), item->uri);
item->init(); item->init();
curl_multi_add_handle(curlm, item->req); curl_multi_add_handle(curlm, item->req);
item->active = true;
items[item->req] = item; items[item->req] = item;
} }
} }
@ -629,9 +685,9 @@ struct curlFileTransfer : public FileTransfer
std::shared_ptr<TransferItem> enqueueItem(std::shared_ptr<TransferItem> item) std::shared_ptr<TransferItem> enqueueItem(std::shared_ptr<TransferItem> item)
{ {
if (item->uploadData if (item->uploadData
&& !item->request.uri.starts_with("http://") && !item->uri.starts_with("http://")
&& !item->request.uri.starts_with("https://")) && !item->uri.starts_with("https://"))
throw nix::Error("uploading to '%s' is not supported", item->request.uri); throw nix::Error("uploading to '%s' is not supported", item->uri);
{ {
auto state(state_.lock()); auto state(state_.lock());
@ -659,77 +715,106 @@ struct curlFileTransfer : public FileTransfer
} }
#endif #endif
std::future<FileTransferResult> enqueueDownload(const FileTransferRequest & request) override std::future<std::pair<FileTransferResult, std::string>>
enqueueDownload(const std::string & uri, const Headers & headers = {}) override
{ {
return enqueueFileTransfer(request, std::nullopt); return enqueueFileTransfer(uri, headers, std::nullopt, false);
} }
std::future<FileTransferResult> void upload(const std::string & uri, std::string data, const Headers & headers) override
enqueueUpload(const FileTransferRequest & request, std::string data) override
{ {
return enqueueFileTransfer(request, std::move(data)); enqueueFileTransfer(uri, headers, std::move(data), false).get();
} }
std::future<FileTransferResult> std::future<std::pair<FileTransferResult, std::string>> enqueueFileTransfer(
enqueueFileTransfer(const FileTransferRequest & request, std::optional<std::string> data) const std::string & uri,
const Headers & headers,
std::optional<std::string> data,
bool noBody
)
{ {
return enqueueFileTransfer( return enqueueFileTransfer(
request, uri,
headers,
[](std::exception_ptr ex) { [](std::exception_ptr ex) {
if (ex) { if (ex) {
std::rethrow_exception(ex); std::rethrow_exception(ex);
} }
}, },
{}, {},
std::move(data) std::move(data),
noBody
); );
} }
std::future<FileTransferResult> enqueueFileTransfer(const FileTransferRequest & request, std::future<std::pair<FileTransferResult, std::string>> enqueueFileTransfer(
const std::string & uri,
const Headers & headers,
std::invocable<std::exception_ptr> auto callback, std::invocable<std::exception_ptr> auto callback,
std::function<void(TransferItem &, std::string_view data)> dataCallback, std::function<void(TransferItem &, std::string_view data)> dataCallback,
std::optional<std::string> data std::optional<std::string> data,
bool noBody
) )
{ {
/* Ugly hack to support s3:// URIs. */ /* Ugly hack to support s3:// URIs. */
if (request.uri.starts_with("s3://")) { if (uri.starts_with("s3://")) {
// FIXME: do this on a worker thread // FIXME: do this on a worker thread
return std::async(std::launch::deferred, [uri{request.uri}]() -> FileTransferResult { return std::async(
std::launch::deferred,
[uri]() -> std::pair<FileTransferResult, std::string> {
#if ENABLE_S3 #if ENABLE_S3
auto [bucketName, key, params] = parseS3Uri(uri); auto [bucketName, key, params] = parseS3Uri(uri);
std::string profile = getOr(params, "profile", ""); std::string profile = getOr(params, "profile", "");
std::string region = getOr(params, "region", Aws::Region::US_EAST_1); std::string region = getOr(params, "region", Aws::Region::US_EAST_1);
std::string scheme = getOr(params, "scheme", ""); std::string scheme = getOr(params, "scheme", "");
std::string endpoint = getOr(params, "endpoint", ""); std::string endpoint = getOr(params, "endpoint", "");
S3Helper s3Helper(profile, region, scheme, endpoint); S3Helper s3Helper(profile, region, scheme, endpoint);
// FIXME: implement ETag // FIXME: implement ETag
auto s3Res = s3Helper.getObject(bucketName, key); auto s3Res = s3Helper.getObject(bucketName, key);
FileTransferResult res; FileTransferResult res;
if (!s3Res.data) if (!s3Res.data)
throw FileTransferError(NotFound, "S3 object '%s' does not exist", uri); throw FileTransferError(NotFound, "S3 object '%s' does not exist", uri);
res.data = std::move(*s3Res.data); return {res, std::move(*s3Res.data)};
return res;
#else #else
throw nix::Error("cannot download '%s' because Lix is not built with S3 support", uri); throw nix::Error(
"cannot download '%s' because Lix is not built with S3 support", uri
);
#endif #endif
}); }
);
} }
return enqueueItem(std::make_shared<TransferItem>( return enqueueItem(std::make_shared<TransferItem>(
*this, *this,
request, uri,
headers,
getCurActivity(), getCurActivity(),
std::move(callback), std::move(callback),
std::move(dataCallback), std::move(dataCallback),
std::move(data) std::move(data),
noBody
)) ))
->callback.get_future(); ->callback.get_future();
} }
box_ptr<Source> download(FileTransferRequest && request) override bool exists(const std::string & uri, const Headers & headers) override
{
try {
enqueueFileTransfer(uri, headers, std::nullopt, true).get();
return true;
} catch (FileTransferError & e) {
/* S3 buckets return 403 if a file doesn't exist and the
bucket is unlistable, so treat 403 as 404. */
if (e.error == FileTransfer::NotFound || e.error == FileTransfer::Forbidden)
return false;
throw;
}
}
box_ptr<Source> download(const std::string & uri, const Headers & headers) override
{ {
struct State { struct State {
bool done = false, failed = false; bool done = false, failed = false;
@ -741,7 +826,8 @@ struct curlFileTransfer : public FileTransfer
auto _state = std::make_shared<Sync<State>>(); auto _state = std::make_shared<Sync<State>>();
enqueueFileTransfer( enqueueFileTransfer(
request, uri,
headers,
[_state](std::exception_ptr ex) { [_state](std::exception_ptr ex) {
auto state(_state->lock()); auto state(_state->lock());
state->done = true; state->done = true;
@ -776,7 +862,8 @@ struct curlFileTransfer : public FileTransfer
state->data.append(data); state->data.append(data);
state->avail.notify_one(); state->avail.notify_one();
}, },
std::nullopt std::nullopt,
false
); );
struct InnerSource : Source struct InnerSource : Source

View file

@ -52,25 +52,11 @@ struct FileTransferSettings : Config
extern FileTransferSettings fileTransferSettings; extern FileTransferSettings fileTransferSettings;
struct FileTransferRequest
{
std::string uri;
Headers headers;
std::string expectedETag;
bool verifyTLS = true;
bool head = false;
FileTransferRequest(std::string_view uri)
: uri(uri) { }
};
struct FileTransferResult struct FileTransferResult
{ {
bool cached = false; bool cached = false;
std::string etag; std::string etag;
std::string effectiveUri; std::string effectiveUri;
std::string data;
uint64_t bodySize = 0;
/* An "immutable" URL for this resource (i.e. one whose contents /* An "immutable" URL for this resource (i.e. one whose contents
will never change), as returned by the `Link: <url>; will never change), as returned by the `Link: <url>;
rel="immutable"` header. */ rel="immutable"` header. */
@ -87,14 +73,26 @@ struct FileTransfer
* Enqueues a download request, returning a future for the result of * Enqueues a download request, returning a future for the result of
* the download. The future may throw a FileTransferError exception. * the download. The future may throw a FileTransferError exception.
*/ */
virtual std::future<FileTransferResult> enqueueDownload(const FileTransferRequest & request) = 0; virtual std::future<std::pair<FileTransferResult, std::string>>
enqueueDownload(const std::string & uri, const Headers & headers = {}) = 0;
/** /**
* Enqueue an upload request, returning a future for the result of * Upload some data. May throw a FileTransferError exception.
* the upload. The future may throw a FileTransferError exception.
*/ */
virtual std::future<FileTransferResult> virtual void
enqueueUpload(const FileTransferRequest & request, std::string data) = 0; upload(const std::string & uri, std::string data, const Headers & headers = {}) = 0;
/**
* Checks whether the given URI exists. For historical reasons this function
* treats HTTP 403 responses like HTTP 404 responses and returns `false` for
* both. This was originally done to handle unlistable S3 buckets, which may
* return 403 (not 404) if the reuqested object doesn't exist in the bucket.
*
* ## Bugs
*
* S3 objects are downloaded completely to answer this request.
*/
virtual bool exists(const std::string & uri, const Headers & headers = {}) = 0;
/** /**
* Download a file, returning its contents through a source. Will not return * Download a file, returning its contents through a source. Will not return
@ -103,7 +101,7 @@ struct FileTransfer
* thrown by the returned source. The source will only throw errors detected * thrown by the returned source. The source will only throw errors detected
* during the transfer itself (decompression errors, connection drops, etc). * during the transfer itself (decompression errors, connection drops, etc).
*/ */
virtual box_ptr<Source> download(FileTransferRequest && request) = 0; virtual box_ptr<Source> download(const std::string & uri, const Headers & headers = {}) = 0;
enum Error { NotFound, Forbidden, Misc, Transient, Interrupted }; enum Error { NotFound, Forbidden, Misc, Transient, Interrupted };
}; };

View file

@ -114,15 +114,8 @@ protected:
checkEnabled(); checkEnabled();
try { try {
FileTransferRequest request{makeURI(path)}; return getFileTransfer()->exists(makeURI(path));
request.head = true;
getFileTransfer()->enqueueDownload(request).get();
return true;
} catch (FileTransferError & e) { } catch (FileTransferError & e) {
/* S3 buckets return 403 if a file doesn't exist and the
bucket is unlistable, so treat 403 as 404. */
if (e.error == FileTransfer::NotFound || e.error == FileTransfer::Forbidden)
return false;
maybeDisable(); maybeDisable();
throw; throw;
} }
@ -132,13 +125,13 @@ protected:
std::shared_ptr<std::basic_iostream<char>> istream, std::shared_ptr<std::basic_iostream<char>> istream,
const std::string & mimeType) override const std::string & mimeType) override
{ {
FileTransferRequest req{makeURI(path)};
auto data = StreamToSourceAdapter(istream).drain(); auto data = StreamToSourceAdapter(istream).drain();
req.headers = {{"Content-Type", mimeType}};
try { try {
getFileTransfer()->enqueueUpload(req, std::move(data)).get(); getFileTransfer()->upload(makeURI(path), std::move(data), {{"Content-Type", mimeType}});
} catch (FileTransferError & e) { } catch (FileTransferError & e) {
throw UploadToHTTP("while uploading to HTTP binary cache at '%s': %s", cacheUri, e.msg()); throw UploadToHTTP(
"while uploading to HTTP binary cache at '%s': %s", cacheUri, e.msg()
);
} }
} }
@ -153,9 +146,8 @@ protected:
box_ptr<Source> getFile(const std::string & path) override box_ptr<Source> getFile(const std::string & path) override
{ {
checkEnabled(); checkEnabled();
FileTransferRequest request{makeURI(path)};
try { try {
return getFileTransfer()->download(std::move(request)); return getFileTransfer()->download(makeURI(path));
} catch (FileTransferError & e) { } catch (FileTransferError & e) {
if (e.error == FileTransfer::NotFound || e.error == FileTransfer::Forbidden) if (e.error == FileTransfer::NotFound || e.error == FileTransfer::Forbidden)
throw NoSuchBinaryCacheFile("file '%s' does not exist in binary cache '%s'", path, getUri()); throw NoSuchBinaryCacheFile("file '%s' does not exist in binary cache '%s'", path, getUri());
@ -168,10 +160,8 @@ protected:
{ {
checkEnabled(); checkEnabled();
FileTransferRequest request{makeURI(path)};
try { try {
return std::move(getFileTransfer()->enqueueDownload(request).get().data); return std::move(getFileTransfer()->enqueueDownload(makeURI(path)).get().second);
} catch (FileTransferError & e) { } catch (FileTransferError & e) {
if (e.error == FileTransfer::NotFound || e.error == FileTransfer::Forbidden) if (e.error == FileTransfer::NotFound || e.error == FileTransfer::Forbidden)
return {}; return {};

View file

@ -48,7 +48,7 @@ std::string resolveMirrorUrl(EvalState & state, const std::string & url)
std::tuple<StorePath, Hash> prefetchFile( std::tuple<StorePath, Hash> prefetchFile(
ref<Store> store, ref<Store> store,
std::string_view url, const std::string & url,
std::optional<std::string> name, std::optional<std::string> name,
HashType hashType, HashType hashType,
std::optional<Hash> expectedHash, std::optional<Hash> expectedHash,
@ -98,8 +98,7 @@ std::tuple<StorePath, Hash> prefetchFile(
FdSink sink(fd.get()); FdSink sink(fd.get());
FileTransferRequest req(url); getFileTransfer()->download(url)->drainInto(sink);
getFileTransfer()->download(std::move(req))->drainInto(sink);
} }
/* Optionally unpack the file. */ /* Optionally unpack the file. */

View file

@ -285,12 +285,11 @@ struct CmdUpgradeNix : MixDryRun, EvalCommand
Activity act(*logger, lvlInfo, actUnknown, "querying latest Nix version"); Activity act(*logger, lvlInfo, actUnknown, "querying latest Nix version");
// FIXME: use nixos.org? // FIXME: use nixos.org?
auto req = FileTransferRequest(storePathsUrl); auto [res, data] = getFileTransfer()->enqueueDownload(storePathsUrl).get();
auto res = getFileTransfer()->enqueueDownload(req).get();
auto state = std::make_unique<EvalState>(SearchPath{}, store); auto state = std::make_unique<EvalState>(SearchPath{}, store);
auto v = state->allocValue(); auto v = state->allocValue();
state->eval(state->parseExprFromString(res.data, state->rootPath(CanonPath("/no-such-path"))), *v); state->eval(state->parseExprFromString(data, state->rootPath(CanonPath("/no-such-path"))), *v);
Bindings & bindings(*state->allocBindings(0)); Bindings & bindings(*state->allocBindings(0));
auto v2 = findAlongAttrPath(*state, settings.thisSystem, bindings, *v).first; auto v2 = findAlongAttrPath(*state, settings.thisSystem, bindings, *v).first;

View file

@ -136,7 +136,7 @@ TEST(FileTransfer, exceptionAbortsDownload)
LambdaSink broken([](auto block) { throw Done(); }); LambdaSink broken([](auto block) { throw Done(); });
ASSERT_THROW(ft->download(FileTransferRequest("file:///dev/zero"))->drainInto(broken), Done); ASSERT_THROW(ft->download("file:///dev/zero")->drainInto(broken), Done);
// makeFileTransfer returns a ref<>, which cannot be cleared. since we also // makeFileTransfer returns a ref<>, which cannot be cleared. since we also
// can't default-construct it we'll have to overwrite it instead, but we'll // can't default-construct it we'll have to overwrite it instead, but we'll
@ -155,7 +155,7 @@ TEST(FileTransfer, exceptionAbortsRead)
auto [port, srv] = serveHTTP("200 ok", "content-length: 0\r\n", [] { return ""; }); auto [port, srv] = serveHTTP("200 ok", "content-length: 0\r\n", [] { return ""; });
auto ft = makeFileTransfer(); auto ft = makeFileTransfer();
char buf[10] = ""; char buf[10] = "";
ASSERT_THROW(ft->download(FileTransferRequest(fmt("http://[::1]:%d/index", port)))->read(buf, 10), EndOfFile); ASSERT_THROW(ft->download(fmt("http://[::1]:%d/index", port))->read(buf, 10), EndOfFile);
} }
TEST(FileTransfer, NOT_ON_DARWIN(reportsSetupErrors)) TEST(FileTransfer, NOT_ON_DARWIN(reportsSetupErrors))
@ -163,7 +163,7 @@ TEST(FileTransfer, NOT_ON_DARWIN(reportsSetupErrors))
auto [port, srv] = serveHTTP("404 not found", "", [] { return ""; }); auto [port, srv] = serveHTTP("404 not found", "", [] { return ""; });
auto ft = makeFileTransfer(); auto ft = makeFileTransfer();
ASSERT_THROW( ASSERT_THROW(
ft->enqueueDownload(FileTransferRequest(fmt("http://[::1]:%d/index", port))).get(), ft->enqueueDownload(fmt("http://[::1]:%d/index", port)).get(),
FileTransferError FileTransferError
); );
} }
@ -179,8 +179,7 @@ TEST(FileTransfer, NOT_ON_DARWIN(defersFailures))
return std::string(1024 * 1024, ' '); return std::string(1024 * 1024, ' ');
}); });
auto ft = makeFileTransfer(0); auto ft = makeFileTransfer(0);
FileTransferRequest req(fmt("http://[::1]:%d/index", port)); auto src = ft->download(fmt("http://[::1]:%d/index", port));
auto src = ft->download(std::move(req));
ASSERT_THROW(src->drain(), FileTransferError); ASSERT_THROW(src->drain(), FileTransferError);
} }
@ -193,7 +192,7 @@ TEST(FileTransfer, NOT_ON_DARWIN(handlesContentEncoding))
auto ft = makeFileTransfer(); auto ft = makeFileTransfer();
StringSink sink; StringSink sink;
ft->download(FileTransferRequest(fmt("http://[::1]:%d/index", port)))->drainInto(sink); ft->download(fmt("http://[::1]:%d/index", port))->drainInto(sink);
EXPECT_EQ(sink.s, original); EXPECT_EQ(sink.s, original);
} }
@ -216,8 +215,7 @@ TEST(FileTransfer, usesIntermediateLinkHeaders)
{"200 ok", "content-length: 1\r\n", [] { return "a"; }}, {"200 ok", "content-length: 1\r\n", [] { return "a"; }},
}); });
auto ft = makeFileTransfer(0); auto ft = makeFileTransfer(0);
FileTransferRequest req(fmt("http://[::1]:%d/first", port)); auto [result, _data] = ft->enqueueDownload(fmt("http://[::1]:%d/first", port)).get();
auto result = ft->enqueueDownload(req).get();
ASSERT_EQ(result.immutableUrl, "http://foo"); ASSERT_EQ(result.immutableUrl, "http://foo");
} }