lix/src/libstore/filetransfer.cc

874 lines
32 KiB
C++
Raw Normal View History

#include "filetransfer.hh"
2015-04-09 10:12:50 +00:00
#include "util.hh"
#include "globals.hh"
#include "store-api.hh"
#include "s3.hh"
#include "compression.hh"
#include "finally.hh"
#include "callback.hh"
2021-04-23 06:30:05 +00:00
#if ENABLE_S3
2017-03-03 21:12:17 +00:00
#include <aws/core/client/ClientConfiguration.h>
#endif
2015-04-09 10:12:50 +00:00
#include <unistd.h>
#include <fcntl.h>
2015-04-09 10:12:50 +00:00
#include <curl/curl.h>
2018-01-31 21:34:19 +00:00
#include <algorithm>
#include <cmath>
2018-01-31 21:34:19 +00:00
#include <cstring>
#include <iostream>
#include <queue>
#include <random>
2018-01-31 21:34:19 +00:00
#include <thread>
2020-06-17 13:18:10 +00:00
#include <regex>
2015-10-07 15:31:50 +00:00
using namespace std::string_literals;
2015-04-09 10:12:50 +00:00
namespace nix {
2020-04-06 21:43:43 +00:00
FileTransferSettings fileTransferSettings;
static GlobalConfig::Register rFileTransferSettings(&fileTransferSettings);
2019-11-06 16:30:48 +00:00
2020-04-06 21:43:43 +00:00
struct curlFileTransfer : public FileTransfer
2015-04-09 10:12:50 +00:00
{
CURLM * curlm = 0;
2015-04-09 10:12:50 +00:00
std::random_device rd;
std::mt19937 mt19937;
2020-04-06 20:59:36 +00:00
struct TransferItem : public std::enable_shared_from_this<TransferItem>
2015-04-09 10:12:50 +00:00
{
2020-04-06 21:43:43 +00:00
curlFileTransfer & fileTransfer;
FileTransferRequest request;
FileTransferResult result;
2017-05-16 14:09:57 +00:00
Activity act;
bool done = false; // whether either the success or failure function has been called
2020-04-06 21:43:43 +00:00
Callback<FileTransferResult> callback;
CURL * req = 0;
bool active = false; // whether the handle has been added to the multi object
std::string statusMsg;
unsigned int attempt = 0;
/* Don't start this download until the specified time point
has been reached. */
std::chrono::steady_clock::time_point embargo;
struct curl_slist * requestHeaders = 0;
std::string encoding;
bool acceptRanges = false;
curl_off_t writtenToSink = 0;
2020-06-17 16:58:59 +00:00
inline static const std::set<long> successfulStatuses {200, 201, 204, 206, 304, 0 /* other protocol */};
2020-06-16 09:51:34 +00:00
/* Get the HTTP status code, or 0 for other protocols. */
long getHTTPStatus()
{
long httpStatus = 0;
long protocol = 0;
curl_easy_getinfo(req, CURLINFO_PROTOCOL, &protocol);
if (protocol == CURLPROTO_HTTP || protocol == CURLPROTO_HTTPS)
curl_easy_getinfo(req, CURLINFO_RESPONSE_CODE, &httpStatus);
return httpStatus;
}
2020-04-06 21:43:43 +00:00
TransferItem(curlFileTransfer & fileTransfer,
const FileTransferRequest & request,
Callback<FileTransferResult> && callback)
: fileTransfer(fileTransfer)
, request(request)
2020-04-06 21:43:43 +00:00
, act(*logger, lvlTalkative, actFileTransfer,
fmt(request.data ? "uploading '%s'" : "downloading '%s'", request.uri),
{request.uri}, request.parentAct)
, callback(std::move(callback))
2020-12-02 13:00:43 +00:00
, finalSink([this](std::string_view data) {
if (errorSink) {
(*errorSink)(data);
}
if (this->request.dataCallback) {
2020-06-16 09:51:34 +00:00
auto httpStatus = getHTTPStatus();
/* Only write data to the sink if this is a
successful response. */
2020-06-17 16:58:59 +00:00
if (successfulStatuses.count(httpStatus)) {
2020-12-02 13:00:43 +00:00
writtenToSink += data.size();
this->request.dataCallback(data);
}
} else
this->result.data.append(data);
})
{
advertise transport encoding in http transfers to tl;dr: With this 1 line change I was able to get a speedup of 1.5x on 1Gbit/s wan connections by enabling zstd compression in nginx. Also nix already supported all common compression format for http transfer, webservers usually only enable them if they are advertised through the Accept-Encoding header. This pull requests makes nix advertises content compression support for zstd, br, gzip and deflate. It's particular useful to add transparent compression for binary caches that serve packages from the host nix store in particular nix-serve, nix-serve-ng and harmonia. I tried so far gzip, brotli and zstd, whereas only zstd was able to bring me performance improvements for 1Gbit/s WAN connections. The following nginx configuration was used in combination with the [zstd module](https://github.com/tokers/zstd-nginx-module) and [harmonia](https://github.com/nix-community/harmonia/) ```nix { services.nginx.virtualHosts."cache.yourhost.com" = { locations."/".extraConfig = '' proxy_pass http://127.0.0.1:5000; proxy_set_header Host $host; proxy_redirect http:// https://; proxy_http_version 1.1; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection $connection_upgrade; zstd on; zstd_types application/x-nix-archive; ''; }; } ``` For testing I unpacked a linux kernel tarball to the nix store using this command `nix-prefetch-url --unpack https://cdn.kernel.org/pub/linux/kernel/v6.x/linux-6.1.8.tar.gz`. Before: ```console $ nix build && rm -rf /tmp/hello && time ./result/bin/nix copy --no-check-sigs --from https://cache.thalheim.io --to 'file:///tmp/hello?compression=none' '/nix/store/j42mahch5f0jvfmayhzwbb88sw36fvah-linux-6.1.8.tar.gz' warning: Git tree '/scratch/joerg/nix' is dirty real 0m18,375s user 0m2,889s sys 0m1,558s ``` After: ```console $ nix build && rm -rf /tmp/hello && time ./result/bin/nix copy --no-check-sigs --from https://cache.thalheim.io --to 'file:///tmp/hello?compression=none' '/nix/store/j42mahch5f0jvfmayhzwb b88sw36fvah-linux-6.1.8.tar.gz' real 0m11,884s user 0m4,130s sys 0m1,439s ``` Signed-off-by: Jörg Thalheim <joerg@thalheim.io> Update src/libstore/filetransfer.cc Co-authored-by: Théophane Hufschmitt <7226587+thufschmitt@users.noreply.github.com>
2023-01-29 14:57:15 +00:00
requestHeaders = curl_slist_append(requestHeaders, "Accept-Encoding: zstd, br, gzip, deflate, bzip2, xz");
if (!request.expectedETag.empty())
requestHeaders = curl_slist_append(requestHeaders, ("If-None-Match: " + request.expectedETag).c_str());
if (!request.mimeType.empty())
requestHeaders = curl_slist_append(requestHeaders, ("Content-Type: " + request.mimeType).c_str());
for (auto it = request.headers.begin(); it != request.headers.end(); ++it){
requestHeaders = curl_slist_append(requestHeaders, fmt("%s: %s", it->first, it->second).c_str());
}
}
2015-04-09 10:12:50 +00:00
2020-04-06 20:59:36 +00:00
~TransferItem()
{
if (req) {
if (active)
2020-04-06 21:43:43 +00:00
curl_multi_remove_handle(fileTransfer.curlm, req);
curl_easy_cleanup(req);
}
if (requestHeaders) curl_slist_free_all(requestHeaders);
try {
if (!done)
fail(FileTransferError(Interrupted, {}, "download of '%s' was interrupted", request.uri));
} catch (...) {
ignoreException();
}
}
void failEx(std::exception_ptr ex)
{
assert(!done);
done = true;
callback.rethrow(ex);
}
template<class T>
void fail(T && e)
{
failEx(std::make_exception_ptr(std::move(e)));
}
LambdaSink finalSink;
2019-12-10 08:47:38 +00:00
std::shared_ptr<FinishSink> decompressionSink;
std::optional<StringSink> errorSink;
std::exception_ptr writeException;
size_t writeCallback(void * contents, size_t size, size_t nmemb)
{
try {
size_t realSize = size * nmemb;
result.bodySize += realSize;
if (!decompressionSink) {
decompressionSink = makeDecompressionSink(encoding, finalSink);
if (! successfulStatuses.count(getHTTPStatus())) {
// In this case we want to construct a TeeSink, to keep
// the response around (which we figure won't be big
// like an actual download should be) to improve error
// messages.
errorSink = StringSink { };
}
}
2020-12-02 13:00:43 +00:00
(*decompressionSink)({(char *) contents, realSize});
return realSize;
} catch (...) {
writeException = std::current_exception();
return 0;
}
}
static size_t writeCallbackWrapper(void * contents, size_t size, size_t nmemb, void * userp)
{
2020-04-06 20:59:36 +00:00
return ((TransferItem *) userp)->writeCallback(contents, size, nmemb);
}
size_t headerCallback(void * contents, size_t size, size_t nmemb)
{
size_t realSize = size * nmemb;
std::string line((char *) contents, realSize);
printMsg(lvlVomit, "got header for '%s': %s", request.uri, trim(line));
static std::regex statusLine("HTTP/[^ ]+ +[0-9]+(.*)", std::regex::extended | std::regex::icase);
std::smatch match;
if (std::regex_match(line, match, statusLine)) {
result.etag = "";
result.data.clear();
result.bodySize = 0;
statusMsg = trim(match.str(1));
acceptRanges = false;
encoding = "";
} else {
auto i = line.find(':');
if (i != std::string::npos) {
std::string name = toLower(trim(line.substr(0, i)));
if (name == "etag") {
result.etag = trim(line.substr(i + 1));
/* Hack to work around a GitHub bug: it sends
ETags, but ignores If-None-Match. So if we get
the expected ETag on a 200 response, then shut
down the connection because we already have the
data. */
long httpStatus = 0;
curl_easy_getinfo(req, CURLINFO_RESPONSE_CODE, &httpStatus);
if (result.etag == request.expectedETag && httpStatus == 200) {
debug("shutting down on 200 HTTP response with expected ETag");
return 0;
}
} else if (name == "content-encoding")
encoding = trim(line.substr(i + 1));
else if (name == "accept-ranges" && toLower(trim(line.substr(i + 1))) == "bytes")
acceptRanges = true;
2015-04-09 10:12:50 +00:00
}
}
return realSize;
2015-04-09 10:12:50 +00:00
}
static size_t headerCallbackWrapper(void * contents, size_t size, size_t nmemb, void * userp)
{
2020-04-06 20:59:36 +00:00
return ((TransferItem *) userp)->headerCallback(contents, size, nmemb);
}
int progressCallback(double dltotal, double dlnow)
{
try {
act.progress(dlnow, dltotal);
} catch (nix::Interrupted &) {
assert(_isInterrupted);
}
return _isInterrupted;
}
static int progressCallbackWrapper(void * userp, double dltotal, double dlnow, double ultotal, double ulnow)
{
2020-04-06 20:59:36 +00:00
return ((TransferItem *) userp)->progressCallback(dltotal, dlnow);
}
static int debugCallback(CURL * handle, curl_infotype type, char * data, size_t size, void * userptr)
{
if (type == CURLINFO_TEXT)
vomit("curl: %s", chomp(std::string(data, size)));
return 0;
}
size_t readOffset = 0;
size_t readCallback(char *buffer, size_t size, size_t nitems)
{
if (readOffset == request.data->length())
return 0;
auto count = std::min(size * nitems, request.data->length() - readOffset);
assert(count);
memcpy(buffer, request.data->data() + readOffset, count);
readOffset += count;
return count;
}
static size_t readCallbackWrapper(char *buffer, size_t size, size_t nitems, void * userp)
{
2020-04-06 20:59:36 +00:00
return ((TransferItem *) userp)->readCallback(buffer, size, nitems);
}
void init()
{
if (!req) req = curl_easy_init();
curl_easy_reset(req);
if (verbosity >= lvlVomit) {
curl_easy_setopt(req, CURLOPT_VERBOSE, 1);
2020-04-06 20:59:36 +00:00
curl_easy_setopt(req, CURLOPT_DEBUGFUNCTION, TransferItem::debugCallback);
}
curl_easy_setopt(req, CURLOPT_URL, request.uri.c_str());
curl_easy_setopt(req, CURLOPT_FOLLOWLOCATION, 1L);
2018-06-18 08:36:19 +00:00
curl_easy_setopt(req, CURLOPT_MAXREDIRS, 10);
curl_easy_setopt(req, CURLOPT_NOSIGNAL, 1);
curl_easy_setopt(req, CURLOPT_USERAGENT,
("curl/" LIBCURL_VERSION " Nix/" + nixVersion +
2020-04-06 21:43:43 +00:00
(fileTransferSettings.userAgentSuffix != "" ? " " + fileTransferSettings.userAgentSuffix.get() : "")).c_str());
#if LIBCURL_VERSION_NUM >= 0x072b00
curl_easy_setopt(req, CURLOPT_PIPEWAIT, 1);
#endif
#if LIBCURL_VERSION_NUM >= 0x072f00
2020-04-06 21:43:43 +00:00
if (fileTransferSettings.enableHttp2)
curl_easy_setopt(req, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2TLS);
else
curl_easy_setopt(req, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1);
#endif
2020-04-06 20:59:36 +00:00
curl_easy_setopt(req, CURLOPT_WRITEFUNCTION, TransferItem::writeCallbackWrapper);
curl_easy_setopt(req, CURLOPT_WRITEDATA, this);
2020-04-06 20:59:36 +00:00
curl_easy_setopt(req, CURLOPT_HEADERFUNCTION, TransferItem::headerCallbackWrapper);
curl_easy_setopt(req, CURLOPT_HEADERDATA, this);
curl_easy_setopt(req, CURLOPT_PROGRESSFUNCTION, progressCallbackWrapper);
curl_easy_setopt(req, CURLOPT_PROGRESSDATA, this);
curl_easy_setopt(req, CURLOPT_NOPROGRESS, 0);
curl_easy_setopt(req, CURLOPT_HTTPHEADER, requestHeaders);
if (settings.downloadSpeed.get() > 0)
curl_easy_setopt(req, CURLOPT_MAX_RECV_SPEED_LARGE, (curl_off_t) (settings.downloadSpeed.get() * 1024));
if (request.head)
curl_easy_setopt(req, CURLOPT_NOBODY, 1);
if (request.data) {
curl_easy_setopt(req, CURLOPT_UPLOAD, 1L);
curl_easy_setopt(req, CURLOPT_READFUNCTION, readCallbackWrapper);
curl_easy_setopt(req, CURLOPT_READDATA, this);
curl_easy_setopt(req, CURLOPT_INFILESIZE_LARGE, (curl_off_t) request.data->length());
}
if (request.verifyTLS) {
if (settings.caFile != "")
curl_easy_setopt(req, CURLOPT_CAINFO, settings.caFile.get().c_str());
} else {
curl_easy_setopt(req, CURLOPT_SSL_VERIFYPEER, 0);
curl_easy_setopt(req, CURLOPT_SSL_VERIFYHOST, 0);
}
2020-04-06 21:43:43 +00:00
curl_easy_setopt(req, CURLOPT_CONNECTTIMEOUT, fileTransferSettings.connectTimeout.get());
curl_easy_setopt(req, CURLOPT_LOW_SPEED_LIMIT, 1L);
2020-04-06 21:43:43 +00:00
curl_easy_setopt(req, CURLOPT_LOW_SPEED_TIME, fileTransferSettings.stalledDownloadTimeout.get());
2017-02-09 17:16:09 +00:00
/* If no file exist in the specified path, curl continues to work
anyway as if netrc support was disabled. */
curl_easy_setopt(req, CURLOPT_NETRC_FILE, settings.netrcFile.get().c_str());
2017-02-01 12:37:34 +00:00
curl_easy_setopt(req, CURLOPT_NETRC, CURL_NETRC_OPTIONAL);
if (writtenToSink)
curl_easy_setopt(req, CURLOPT_RESUME_FROM_LARGE, writtenToSink);
result.data.clear();
result.bodySize = 0;
2015-10-07 15:31:50 +00:00
}
2015-05-05 12:39:48 +00:00
void finish(CURLcode code)
{
2020-06-16 09:51:34 +00:00
auto httpStatus = getHTTPStatus();
2015-04-09 10:12:50 +00:00
char * effectiveUriCStr;
curl_easy_getinfo(req, CURLINFO_EFFECTIVE_URL, &effectiveUriCStr);
if (effectiveUriCStr)
result.effectiveUri = effectiveUriCStr;
2018-06-05 12:37:26 +00:00
debug("finished %s of '%s'; curl status = %d, HTTP status = %d, body = %d bytes",
request.verb(), request.uri, code, httpStatus, result.bodySize);
if (decompressionSink) {
try {
decompressionSink->finish();
} catch (...) {
writeException = std::current_exception();
}
}
if (code == CURLE_WRITE_ERROR && result.etag == request.expectedETag) {
code = CURLE_OK;
httpStatus = 304;
}
if (writeException)
failEx(writeException);
2020-06-17 16:58:59 +00:00
else if (code == CURLE_OK && successfulStatuses.count(httpStatus))
{
result.cached = httpStatus == 304;
// In 2021, GitHub responds to If-None-Match with 304,
// but omits ETag. We just use the If-None-Match etag
// since 304 implies they are the same.
if (httpStatus == 304 && result.etag == "")
result.etag = request.expectedETag;
act.progress(result.bodySize, result.bodySize);
done = true;
callback(std::move(result));
}
else {
// We treat most errors as transient, but won't retry when hopeless
Error err = Transient;
2018-06-05 14:03:32 +00:00
if (httpStatus == 404 || httpStatus == 410 || code == CURLE_FILE_COULDNT_READ_FILE) {
// The file is definitely not there
err = NotFound;
} else if (httpStatus == 401 || httpStatus == 403 || httpStatus == 407) {
// Don't retry on authentication/authorization failures
err = Forbidden;
2020-01-29 10:47:39 +00:00
} else if (httpStatus >= 400 && httpStatus < 500 && httpStatus != 408 && httpStatus != 429) {
// Most 4xx errors are client errors and are probably not worth retrying:
// * 408 means the server timed out waiting for us, so we try again
2020-01-29 10:47:39 +00:00
// * 429 means too many requests, so we retry (with a delay)
err = Misc;
} else if (httpStatus == 501 || httpStatus == 505 || httpStatus == 511) {
// Let's treat most 5xx (server) errors as transient, except for a handful:
// * 501 not implemented
// * 505 http version not supported
// * 511 we're behind a captive portal
err = Misc;
} else {
// Don't bother retrying on certain cURL errors either
switch (code) {
case CURLE_FAILED_INIT:
2018-01-19 13:05:08 +00:00
case CURLE_URL_MALFORMAT:
case CURLE_NOT_BUILT_IN:
case CURLE_REMOTE_ACCESS_DENIED:
case CURLE_FILE_COULDNT_READ_FILE:
case CURLE_FUNCTION_NOT_FOUND:
case CURLE_ABORTED_BY_CALLBACK:
case CURLE_BAD_FUNCTION_ARGUMENT:
case CURLE_INTERFACE_FAILED:
case CURLE_UNKNOWN_OPTION:
case CURLE_SSL_CACERT_BADFILE:
2018-06-18 08:36:19 +00:00
case CURLE_TOO_MANY_REDIRECTS:
case CURLE_WRITE_ERROR:
case CURLE_UNSUPPORTED_PROTOCOL:
2018-01-29 15:22:59 +00:00
err = Misc;
break;
default: // Shut up warnings
2018-01-29 15:22:59 +00:00
break;
}
}
attempt++;
std::optional<std::string> response;
if (errorSink)
response = std::move(errorSink->s);
auto exc =
code == CURLE_ABORTED_BY_CALLBACK && _isInterrupted
? FileTransferError(Interrupted, std::move(response), "%s of '%s' was interrupted", request.verb(), request.uri)
: httpStatus != 0
2020-04-06 21:43:43 +00:00
? FileTransferError(err,
std::move(response),
"unable to %s '%s': HTTP error %d%s",
request.verb(), request.uri, httpStatus,
code == CURLE_OK ? "" : fmt(" (curl error: %s)", curl_easy_strerror(code)))
2020-04-06 21:43:43 +00:00
: FileTransferError(err,
std::move(response),
"unable to %s '%s': %s (%d)",
request.verb(), request.uri, curl_easy_strerror(code), code);
/* If this is a transient error, then maybe retry the
download after a while. If we're writing to a
sink, we can only retry if the server supports
ranged requests. */
if (err == Transient
&& attempt < request.tries
&& (!this->request.dataCallback
|| writtenToSink == 0
|| (acceptRanges && encoding.empty())))
{
2020-04-06 21:43:43 +00:00
int ms = request.baseRetryTimeMs * std::pow(2.0f, attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(fileTransfer.mt19937));
if (writtenToSink)
warn("%s; retrying from offset %d in %d ms", exc.what(), writtenToSink, ms);
else
warn("%s; retrying in %d ms", exc.what(), ms);
embargo = std::chrono::steady_clock::now() + std::chrono::milliseconds(ms);
2020-04-06 21:43:43 +00:00
fileTransfer.enqueueItem(shared_from_this());
}
else
fail(std::move(exc));
}
}
};
2015-04-09 10:12:50 +00:00
struct State
{
struct EmbargoComparator {
2020-04-06 20:59:36 +00:00
bool operator() (const std::shared_ptr<TransferItem> & i1, const std::shared_ptr<TransferItem> & i2) {
return i1->embargo > i2->embargo;
}
};
bool quit = false;
2020-04-06 20:59:36 +00:00
std::priority_queue<std::shared_ptr<TransferItem>, std::vector<std::shared_ptr<TransferItem>>, EmbargoComparator> incoming;
};
2015-04-09 10:12:50 +00:00
Sync<State> state_;
2015-05-05 12:39:48 +00:00
/* We can't use a std::condition_variable to wake up the curl
thread, because it only monitors file descriptors. So use a
pipe instead. */
Pipe wakeupPipe;
std::thread workerThread;
2020-04-06 21:43:43 +00:00
curlFileTransfer()
: mt19937(rd())
{
static std::once_flag globalInit;
std::call_once(globalInit, curl_global_init, CURL_GLOBAL_ALL);
2015-04-09 10:12:50 +00:00
curlm = curl_multi_init();
#if LIBCURL_VERSION_NUM >= 0x072b00 // Multiplex requires >= 7.43.0
curl_multi_setopt(curlm, CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX);
#endif
#if LIBCURL_VERSION_NUM >= 0x071e00 // Max connections requires >= 7.30.0
curl_multi_setopt(curlm, CURLMOPT_MAX_TOTAL_CONNECTIONS,
2020-04-06 21:43:43 +00:00
fileTransferSettings.httpConnections.get());
#endif
wakeupPipe.create();
fcntl(wakeupPipe.readSide.get(), F_SETFL, O_NONBLOCK);
workerThread = std::thread([&]() { workerThreadEntry(); });
}
2015-04-09 10:12:50 +00:00
2020-04-06 21:43:43 +00:00
~curlFileTransfer()
{
stopWorkerThread();
workerThread.join();
if (curlm) curl_multi_cleanup(curlm);
}
void stopWorkerThread()
{
/* Signal the worker thread to exit. */
{
auto state(state_.lock());
state->quit = true;
2015-04-09 10:12:50 +00:00
}
writeFull(wakeupPipe.writeSide.get(), " ", false);
}
void workerThreadMain()
{
/* Cause this thread to be notified on SIGINT. */
auto callback = createInterruptCallback([&]() {
stopWorkerThread();
});
unshareFilesystem();
2020-04-06 20:59:36 +00:00
std::map<CURL *, std::shared_ptr<TransferItem>> items;
2016-10-19 13:02:38 +00:00
bool quit = false;
std::chrono::steady_clock::time_point nextWakeup;
while (!quit) {
checkInterrupt();
/* Let curl do its thing. */
int running;
CURLMcode mc = curl_multi_perform(curlm, &running);
if (mc != CURLM_OK)
throw nix::Error("unexpected error from curl_multi_perform(): %s", curl_multi_strerror(mc));
/* Set the promises of any finished requests. */
CURLMsg * msg;
int left;
while ((msg = curl_multi_info_read(curlm, &left))) {
if (msg->msg == CURLMSG_DONE) {
auto i = items.find(msg->easy_handle);
assert(i != items.end());
i->second->finish(msg->data.result);
curl_multi_remove_handle(curlm, i->second->req);
i->second->active = false;
items.erase(i);
}
}
2015-10-07 15:31:50 +00:00
/* Wait for activity, including wakeup events. */
int numfds = 0;
struct curl_waitfd extraFDs[1];
extraFDs[0].fd = wakeupPipe.readSide.get();
extraFDs[0].events = CURL_WAIT_POLLIN;
extraFDs[0].revents = 0;
long maxSleepTimeMs = items.empty() ? 10000 : 100;
auto sleepTimeMs =
nextWakeup != std::chrono::steady_clock::time_point()
? std::max(0, (int) std::chrono::duration_cast<std::chrono::milliseconds>(nextWakeup - std::chrono::steady_clock::now()).count())
: maxSleepTimeMs;
vomit("download thread waiting for %d ms", sleepTimeMs);
mc = curl_multi_wait(curlm, extraFDs, 1, sleepTimeMs, &numfds);
if (mc != CURLM_OK)
throw nix::Error("unexpected error from curl_multi_wait(): %s", curl_multi_strerror(mc));
nextWakeup = std::chrono::steady_clock::time_point();
/* Add new curl requests from the incoming requests queue,
except for requests that are embargoed (waiting for a
retry timeout to expire). */
if (extraFDs[0].revents & CURL_WAIT_POLLIN) {
char buf[1024];
auto res = read(extraFDs[0].fd, buf, sizeof(buf));
if (res == -1 && errno != EINTR)
throw SysError("reading curl wakeup socket");
}
2015-04-09 10:12:50 +00:00
2020-04-06 20:59:36 +00:00
std::vector<std::shared_ptr<TransferItem>> incoming;
auto now = std::chrono::steady_clock::now();
{
auto state(state_.lock());
while (!state->incoming.empty()) {
auto item = state->incoming.top();
if (item->embargo <= now) {
incoming.push_back(item);
state->incoming.pop();
} else {
if (nextWakeup == std::chrono::steady_clock::time_point()
|| item->embargo < nextWakeup)
nextWakeup = item->embargo;
break;
}
}
quit = state->quit;
}
2016-08-11 15:34:43 +00:00
for (auto & item : incoming) {
2018-06-05 12:37:26 +00:00
debug("starting %s of %s", item->request.verb(), item->request.uri);
item->init();
curl_multi_add_handle(curlm, item->req);
item->active = true;
items[item->req] = item;
}
}
2015-04-09 10:12:50 +00:00
debug("download thread shutting down");
2015-04-09 10:12:50 +00:00
}
void workerThreadEntry()
{
try {
workerThreadMain();
} catch (nix::Interrupted & e) {
} catch (std::exception & e) {
Improve error formatting Changes: * The divider lines are gone. These were in practice a bit confusing, in particular with --show-trace or --keep-going, since then there were multiple lines, suggesting a start/end which wasn't the case. * Instead, multi-line error messages are now indented to align with the prefix (e.g. "error: "). * The 'description' field is gone since we weren't really using it. * 'hint' is renamed to 'msg' since it really wasn't a hint. * The error is now printed *before* the location info. * The 'name' field is no longer printed since most of the time it wasn't very useful since it was just the name of the exception (like EvalError). Ideally in the future this would be a unique, easily googleable error ID (like rustc). * "trace:" is now just "…". This assumes error contexts start with something like "while doing X". Example before: error: --- AssertionError ---------------------------------------------------------------------------------------- nix at: (7:7) in file: /home/eelco/Dev/nixpkgs/pkgs/applications/misc/hello/default.nix 6| 7| x = assert false; 1; | ^ 8| assertion 'false' failed ----------------------------------------------------- show-trace ----------------------------------------------------- trace: while evaluating the attribute 'x' of the derivation 'hello-2.10' at: (192:11) in file: /home/eelco/Dev/nixpkgs/pkgs/stdenv/generic/make-derivation.nix 191| // (lib.optionalAttrs (!(attrs ? name) && attrs ? pname && attrs ? version)) { 192| name = "${attrs.pname}-${attrs.version}"; | ^ 193| } // (lib.optionalAttrs (stdenv.hostPlatform != stdenv.buildPlatform && !dontAddHostSuffix && (attrs ? name || (attrs ? pname && attrs ? version)))) { Example after: error: assertion 'false' failed at: (7:7) in file: /home/eelco/Dev/nixpkgs/pkgs/applications/misc/hello/default.nix 6| 7| x = assert false; 1; | ^ 8| … while evaluating the attribute 'x' of the derivation 'hello-2.10' at: (192:11) in file: /home/eelco/Dev/nixpkgs/pkgs/stdenv/generic/make-derivation.nix 191| // (lib.optionalAttrs (!(attrs ? name) && attrs ? pname && attrs ? version)) { 192| name = "${attrs.pname}-${attrs.version}"; | ^ 193| } // (lib.optionalAttrs (stdenv.hostPlatform != stdenv.buildPlatform && !dontAddHostSuffix && (attrs ? name || (attrs ? pname && attrs ? version)))) {
2021-01-20 23:27:36 +00:00
printError("unexpected error in download thread: %s", e.what());
}
{
auto state(state_.lock());
while (!state->incoming.empty()) state->incoming.pop();
state->quit = true;
}
}
2020-04-06 20:59:36 +00:00
void enqueueItem(std::shared_ptr<TransferItem> item)
{
2018-06-05 12:44:26 +00:00
if (item->request.data
&& !hasPrefix(item->request.uri, "http://")
&& !hasPrefix(item->request.uri, "https://"))
throw nix::Error("uploading to '%s' is not supported", item->request.uri);
{
auto state(state_.lock());
if (state->quit)
throw nix::Error("cannot enqueue download request because the download thread is shutting down");
state->incoming.push(item);
}
writeFull(wakeupPipe.writeSide.get(), " ");
}
2021-04-23 06:30:05 +00:00
#if ENABLE_S3
std::tuple<std::string, std::string, Store::Params> parseS3Uri(std::string uri)
{
auto [path, params] = splitUriAndParams(uri);
auto slash = path.find('/', 5); // 5 is the length of "s3://" prefix
2019-02-26 10:04:18 +00:00
if (slash == std::string::npos)
throw nix::Error("bad S3 URI '%s'", path);
std::string bucketName(path, 5, slash - 5);
std::string key(path, slash + 1);
return {bucketName, key, params};
}
#endif
2020-04-06 21:43:43 +00:00
void enqueueFileTransfer(const FileTransferRequest & request,
Callback<FileTransferResult> callback) override
{
/* Ugly hack to support s3:// URIs. */
if (hasPrefix(request.uri, "s3://")) {
// FIXME: do this on a worker thread
2018-03-27 20:16:01 +00:00
try {
2021-04-23 06:30:05 +00:00
#if ENABLE_S3
auto [bucketName, key, params] = parseS3Uri(request.uri);
std::string profile = getOr(params, "profile", "");
std::string region = getOr(params, "region", Aws::Region::US_EAST_1);
std::string scheme = getOr(params, "scheme", "");
std::string endpoint = getOr(params, "endpoint", "");
S3Helper s3Helper(profile, region, scheme, endpoint);
// FIXME: implement ETag
auto s3Res = s3Helper.getObject(bucketName, key);
2020-04-06 21:43:43 +00:00
FileTransferResult res;
if (!s3Res.data)
throw FileTransferError(NotFound, "S3 object '%s' does not exist", request.uri);
res.data = std::move(*s3Res.data);
2018-03-27 20:16:01 +00:00
callback(std::move(res));
#else
throw nix::Error("cannot download '%s' because Nix is not built with S3 support", request.uri);
#endif
2018-03-27 20:16:01 +00:00
} catch (...) { callback.rethrow(); }
return;
}
2020-04-06 20:59:36 +00:00
enqueueItem(std::make_shared<TransferItem>(*this, request, std::move(callback)));
}
2015-04-09 10:12:50 +00:00
};
ref<curlFileTransfer> makeCurlFileTransfer()
{
return make_ref<curlFileTransfer>();
}
2020-04-06 21:43:43 +00:00
ref<FileTransfer> getFileTransfer()
{
static ref<curlFileTransfer> fileTransfer = makeCurlFileTransfer();
2021-10-12 14:43:00 +00:00
if (fileTransfer->state_.lock()->quit)
fileTransfer = makeCurlFileTransfer();
2020-04-06 21:43:43 +00:00
return fileTransfer;
}
2020-04-06 21:43:43 +00:00
ref<FileTransfer> makeFileTransfer()
2015-04-09 10:12:50 +00:00
{
return makeCurlFileTransfer();
2015-04-09 10:12:50 +00:00
}
2020-04-06 21:43:43 +00:00
std::future<FileTransferResult> FileTransfer::enqueueFileTransfer(const FileTransferRequest & request)
{
2020-04-06 21:43:43 +00:00
auto promise = std::make_shared<std::promise<FileTransferResult>>();
enqueueFileTransfer(request,
{[promise](std::future<FileTransferResult> fut) {
2018-03-27 20:16:01 +00:00
try {
promise->set_value(fut.get());
} catch (...) {
promise->set_exception(std::current_exception());
}
}});
return promise->get_future();
}
2020-04-06 21:43:43 +00:00
FileTransferResult FileTransfer::download(const FileTransferRequest & request)
{
2020-04-06 21:43:43 +00:00
return enqueueFileTransfer(request).get();
}
2020-04-06 21:43:43 +00:00
FileTransferResult FileTransfer::upload(const FileTransferRequest & request)
{
2020-04-06 21:34:31 +00:00
/* Note: this method is the same as download, but helps in readability */
2020-04-06 21:43:43 +00:00
return enqueueFileTransfer(request).get();
}
2020-04-06 21:43:43 +00:00
void FileTransfer::download(FileTransferRequest && request, Sink & sink)
{
/* Note: we can't call 'sink' via request.dataCallback, because
2020-04-06 21:43:43 +00:00
that would cause the sink to execute on the fileTransfer
thread. If 'sink' is a coroutine, this will fail. Also, if the
sink is expensive (e.g. one that does decompression and writing
to the Nix store), it would stall the download thread too much.
Therefore we use a buffer to communicate data between the
download thread and the calling thread. */
struct State {
bool quit = false;
std::exception_ptr exc;
std::string data;
std::condition_variable avail, request;
};
auto _state = std::make_shared<Sync<State>>();
/* In case of an exception, wake up the download thread. FIXME:
abort the download request. */
Finally finally([&]() {
auto state(_state->lock());
state->quit = true;
state->request.notify_one();
});
2020-12-02 13:00:43 +00:00
request.dataCallback = [_state](std::string_view data) {
auto state(_state->lock());
if (state->quit) return;
/* If the buffer is full, then go to sleep until the calling
thread wakes us up (i.e. when it has removed data from the
buffer). We don't wait forever to prevent stalling the
download thread. (Hopefully sleeping will throttle the
sender.) */
if (state->data.size() > 1024 * 1024) {
debug("download buffer is full; going to sleep");
state.wait_for(state->request, std::chrono::seconds(10));
}
/* Append data to the buffer and wake up the calling
thread. */
2020-12-02 13:00:43 +00:00
state->data.append(data);
state->avail.notify_one();
};
2020-04-06 21:43:43 +00:00
enqueueFileTransfer(request,
{[_state](std::future<FileTransferResult> fut) {
auto state(_state->lock());
state->quit = true;
try {
fut.get();
} catch (...) {
state->exc = std::current_exception();
}
state->avail.notify_one();
state->request.notify_one();
}});
while (true) {
checkInterrupt();
std::string chunk;
/* Grab data if available, otherwise wait for the download
thread to wake us up. */
{
auto state(_state->lock());
2023-02-09 23:54:29 +00:00
if (state->data.empty()) {
if (state->quit) {
if (state->exc) std::rethrow_exception(state->exc);
return;
}
state.wait(state->avail);
2023-02-09 23:54:29 +00:00
if (state->data.empty()) continue;
}
chunk = std::move(state->data);
state->request.notify_one();
}
/* Flush the data to the sink and wake up the download thread
if it's blocked on a full buffer. We don't hold the state
lock while doing this to prevent blocking the download
thread if sink() takes a long time. */
2020-12-02 13:00:43 +00:00
sink(chunk);
}
}
template<typename... Args>
FileTransferError::FileTransferError(FileTransfer::Error error, std::optional<std::string> response, const Args & ... args)
: Error(args...), error(error), response(response)
{
const auto hf = hintfmt(args...);
// FIXME: Due to https://github.com/NixOS/nix/issues/3841 we don't know how
// to print different messages for different verbosity levels. For now
// we add some heuristics for detecting when we want to show the response.
if (response && (response->size() < 1024 || response->find("<html>") != std::string::npos))
2021-01-27 13:04:49 +00:00
err.msg = hintfmt("%1%\n\nresponse body:\n\n%2%", normaltxt(hf.str()), chomp(*response));
Improve error formatting Changes: * The divider lines are gone. These were in practice a bit confusing, in particular with --show-trace or --keep-going, since then there were multiple lines, suggesting a start/end which wasn't the case. * Instead, multi-line error messages are now indented to align with the prefix (e.g. "error: "). * The 'description' field is gone since we weren't really using it. * 'hint' is renamed to 'msg' since it really wasn't a hint. * The error is now printed *before* the location info. * The 'name' field is no longer printed since most of the time it wasn't very useful since it was just the name of the exception (like EvalError). Ideally in the future this would be a unique, easily googleable error ID (like rustc). * "trace:" is now just "…". This assumes error contexts start with something like "while doing X". Example before: error: --- AssertionError ---------------------------------------------------------------------------------------- nix at: (7:7) in file: /home/eelco/Dev/nixpkgs/pkgs/applications/misc/hello/default.nix 6| 7| x = assert false; 1; | ^ 8| assertion 'false' failed ----------------------------------------------------- show-trace ----------------------------------------------------- trace: while evaluating the attribute 'x' of the derivation 'hello-2.10' at: (192:11) in file: /home/eelco/Dev/nixpkgs/pkgs/stdenv/generic/make-derivation.nix 191| // (lib.optionalAttrs (!(attrs ? name) && attrs ? pname && attrs ? version)) { 192| name = "${attrs.pname}-${attrs.version}"; | ^ 193| } // (lib.optionalAttrs (stdenv.hostPlatform != stdenv.buildPlatform && !dontAddHostSuffix && (attrs ? name || (attrs ? pname && attrs ? version)))) { Example after: error: assertion 'false' failed at: (7:7) in file: /home/eelco/Dev/nixpkgs/pkgs/applications/misc/hello/default.nix 6| 7| x = assert false; 1; | ^ 8| … while evaluating the attribute 'x' of the derivation 'hello-2.10' at: (192:11) in file: /home/eelco/Dev/nixpkgs/pkgs/stdenv/generic/make-derivation.nix 191| // (lib.optionalAttrs (!(attrs ? name) && attrs ? pname && attrs ? version)) { 192| name = "${attrs.pname}-${attrs.version}"; | ^ 193| } // (lib.optionalAttrs (stdenv.hostPlatform != stdenv.buildPlatform && !dontAddHostSuffix && (attrs ? name || (attrs ? pname && attrs ? version)))) {
2021-01-20 23:27:36 +00:00
else
err.msg = hf;
}
2015-04-09 10:12:50 +00:00
}