libstore: improve curl wrapper resource handling
wrap all curl pointers in unique pointers for move correctness. the
destructor of TransferItem can go away completely since it does not
do anything special any more: memory bookkeeping is done using raii
wrappers, and its effects on the transfer state are not observable.
only TransferSource inspects the result, and even it does not check
for transfer results in its own destructor (only during transfers).
Change-Id: Ibb58484703854d5d5425e7c63cf2985d93920e97
This commit is contained in:
parent
43777939eb
commit
4ba19f68fb
|
@ -36,7 +36,7 @@ static GlobalConfig::Register rFileTransferSettings(&fileTransferSettings);
|
||||||
|
|
||||||
struct curlFileTransfer : public FileTransfer
|
struct curlFileTransfer : public FileTransfer
|
||||||
{
|
{
|
||||||
CURLM * curlm = 0;
|
std::unique_ptr<CURLM, decltype([](auto * m) { curl_multi_cleanup(m); })> curlm;
|
||||||
|
|
||||||
const unsigned int baseRetryTimeMs;
|
const unsigned int baseRetryTimeMs;
|
||||||
|
|
||||||
|
@ -65,12 +65,12 @@ struct curlFileTransfer : public FileTransfer
|
||||||
transferComplete,
|
transferComplete,
|
||||||
} phase = initialSetup;
|
} phase = initialSetup;
|
||||||
std::promise<FileTransferResult> metadataPromise;
|
std::promise<FileTransferResult> metadataPromise;
|
||||||
CURL * req; // must never be nullptr
|
|
||||||
std::string statusMsg;
|
std::string statusMsg;
|
||||||
|
|
||||||
uint64_t bodySize = 0;
|
uint64_t bodySize = 0;
|
||||||
|
|
||||||
struct curl_slist * requestHeaders = 0;
|
std::unique_ptr<curl_slist, decltype([](auto * s) { curl_slist_free_all(s); })> requestHeaders;
|
||||||
|
std::unique_ptr<CURL, decltype([](auto * c) { curl_easy_cleanup(c); })> req;
|
||||||
|
|
||||||
inline static const std::set<long> successfulStatuses {200, 201, 204, 206, 304, 0 /* other protocol */};
|
inline static const std::set<long> successfulStatuses {200, 201, 204, 206, 304, 0 /* other protocol */};
|
||||||
/* Get the HTTP status code, or 0 for other protocols. */
|
/* Get the HTTP status code, or 0 for other protocols. */
|
||||||
|
@ -78,9 +78,9 @@ struct curlFileTransfer : public FileTransfer
|
||||||
{
|
{
|
||||||
long httpStatus = 0;
|
long httpStatus = 0;
|
||||||
long protocol = 0;
|
long protocol = 0;
|
||||||
curl_easy_getinfo(req, CURLINFO_PROTOCOL, &protocol);
|
curl_easy_getinfo(req.get(), CURLINFO_PROTOCOL, &protocol);
|
||||||
if (protocol == CURLPROTO_HTTP || protocol == CURLPROTO_HTTPS)
|
if (protocol == CURLPROTO_HTTP || protocol == CURLPROTO_HTTPS)
|
||||||
curl_easy_getinfo(req, CURLINFO_RESPONSE_CODE, &httpStatus);
|
curl_easy_getinfo(req.get(), CURLINFO_RESPONSE_CODE, &httpStatus);
|
||||||
return httpStatus;
|
return httpStatus;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -108,86 +108,83 @@ struct curlFileTransfer : public FileTransfer
|
||||||
throw FileTransferError(Misc, {}, "could not allocate curl handle");
|
throw FileTransferError(Misc, {}, "could not allocate curl handle");
|
||||||
}
|
}
|
||||||
for (auto it = headers.begin(); it != headers.end(); ++it){
|
for (auto it = headers.begin(); it != headers.end(); ++it){
|
||||||
requestHeaders = curl_slist_append(requestHeaders, fmt("%s: %s", it->first, it->second).c_str());
|
if (auto next = curl_slist_append(
|
||||||
|
requestHeaders.get(), fmt("%s: %s", it->first, it->second).c_str()
|
||||||
|
);
|
||||||
|
next != nullptr)
|
||||||
|
{
|
||||||
|
(void) requestHeaders.release(); // next now owns this pointer
|
||||||
|
requestHeaders.reset(next);
|
||||||
|
} else {
|
||||||
|
throw FileTransferError(Misc, {}, "could not allocate curl request headers");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (verbosity >= lvlVomit) {
|
if (verbosity >= lvlVomit) {
|
||||||
curl_easy_setopt(req, CURLOPT_VERBOSE, 1);
|
curl_easy_setopt(req.get(), CURLOPT_VERBOSE, 1);
|
||||||
curl_easy_setopt(req, CURLOPT_DEBUGFUNCTION, TransferItem::debugCallback);
|
curl_easy_setopt(req.get(), CURLOPT_DEBUGFUNCTION, TransferItem::debugCallback);
|
||||||
}
|
}
|
||||||
|
|
||||||
curl_easy_setopt(req, CURLOPT_URL, uri.c_str());
|
curl_easy_setopt(req.get(), CURLOPT_URL, uri.c_str());
|
||||||
curl_easy_setopt(req, CURLOPT_FOLLOWLOCATION, 1L);
|
curl_easy_setopt(req.get(), CURLOPT_FOLLOWLOCATION, 1L);
|
||||||
curl_easy_setopt(req, CURLOPT_ACCEPT_ENCODING, ""); // all of them!
|
curl_easy_setopt(req.get(), CURLOPT_ACCEPT_ENCODING, ""); // all of them!
|
||||||
curl_easy_setopt(req, CURLOPT_MAXREDIRS, 10);
|
curl_easy_setopt(req.get(), CURLOPT_MAXREDIRS, 10);
|
||||||
curl_easy_setopt(req, CURLOPT_NOSIGNAL, 1);
|
curl_easy_setopt(req.get(), CURLOPT_NOSIGNAL, 1);
|
||||||
curl_easy_setopt(req, CURLOPT_USERAGENT,
|
curl_easy_setopt(req.get(), CURLOPT_USERAGENT,
|
||||||
("curl/" LIBCURL_VERSION " Lix/" + nixVersion +
|
("curl/" LIBCURL_VERSION " Lix/" + nixVersion +
|
||||||
(fileTransferSettings.userAgentSuffix != "" ? " " + fileTransferSettings.userAgentSuffix.get() : "")).c_str());
|
(fileTransferSettings.userAgentSuffix != "" ? " " + fileTransferSettings.userAgentSuffix.get() : "")).c_str());
|
||||||
curl_easy_setopt(req, CURLOPT_PIPEWAIT, 1);
|
curl_easy_setopt(req.get(), CURLOPT_PIPEWAIT, 1);
|
||||||
if (fileTransferSettings.enableHttp2)
|
if (fileTransferSettings.enableHttp2)
|
||||||
curl_easy_setopt(req, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2TLS);
|
curl_easy_setopt(req.get(), CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2TLS);
|
||||||
else
|
else
|
||||||
curl_easy_setopt(req, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1);
|
curl_easy_setopt(req.get(), CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1);
|
||||||
curl_easy_setopt(req, CURLOPT_WRITEFUNCTION, TransferItem::writeCallbackWrapper);
|
curl_easy_setopt(req.get(), CURLOPT_WRITEFUNCTION, TransferItem::writeCallbackWrapper);
|
||||||
curl_easy_setopt(req, CURLOPT_WRITEDATA, this);
|
curl_easy_setopt(req.get(), CURLOPT_WRITEDATA, this);
|
||||||
curl_easy_setopt(req, CURLOPT_HEADERFUNCTION, TransferItem::headerCallbackWrapper);
|
curl_easy_setopt(req.get(), CURLOPT_HEADERFUNCTION, TransferItem::headerCallbackWrapper);
|
||||||
curl_easy_setopt(req, CURLOPT_HEADERDATA, this);
|
curl_easy_setopt(req.get(), CURLOPT_HEADERDATA, this);
|
||||||
|
|
||||||
curl_easy_setopt(req, CURLOPT_PROGRESSFUNCTION, progressCallbackWrapper);
|
curl_easy_setopt(req.get(), CURLOPT_PROGRESSFUNCTION, progressCallbackWrapper);
|
||||||
curl_easy_setopt(req, CURLOPT_PROGRESSDATA, this);
|
curl_easy_setopt(req.get(), CURLOPT_PROGRESSDATA, this);
|
||||||
curl_easy_setopt(req, CURLOPT_NOPROGRESS, 0);
|
curl_easy_setopt(req.get(), CURLOPT_NOPROGRESS, 0);
|
||||||
|
|
||||||
curl_easy_setopt(req, CURLOPT_PROTOCOLS_STR, "http,https,ftp,ftps");
|
curl_easy_setopt(req.get(), CURLOPT_PROTOCOLS_STR, "http,https,ftp,ftps");
|
||||||
|
|
||||||
curl_easy_setopt(req, CURLOPT_HTTPHEADER, requestHeaders);
|
curl_easy_setopt(req.get(), CURLOPT_HTTPHEADER, requestHeaders.get());
|
||||||
|
|
||||||
if (settings.downloadSpeed.get() > 0)
|
if (settings.downloadSpeed.get() > 0)
|
||||||
curl_easy_setopt(req, CURLOPT_MAX_RECV_SPEED_LARGE, (curl_off_t) (settings.downloadSpeed.get() * 1024));
|
curl_easy_setopt(req.get(), CURLOPT_MAX_RECV_SPEED_LARGE, (curl_off_t) (settings.downloadSpeed.get() * 1024));
|
||||||
|
|
||||||
if (noBody)
|
if (noBody)
|
||||||
curl_easy_setopt(req, CURLOPT_NOBODY, 1);
|
curl_easy_setopt(req.get(), CURLOPT_NOBODY, 1);
|
||||||
|
|
||||||
if (uploadData) {
|
if (uploadData) {
|
||||||
this->uploadData.reset(fmemopen(const_cast<char *>(uploadData->data()), uploadData->size(), "r"));
|
this->uploadData.reset(fmemopen(const_cast<char *>(uploadData->data()), uploadData->size(), "r"));
|
||||||
curl_easy_setopt(req, CURLOPT_UPLOAD, 1L);
|
curl_easy_setopt(req.get(), CURLOPT_UPLOAD, 1L);
|
||||||
curl_easy_setopt(req, CURLOPT_READDATA, this->uploadData.get());
|
curl_easy_setopt(req.get(), CURLOPT_READDATA, this->uploadData.get());
|
||||||
curl_easy_setopt(req, CURLOPT_INFILESIZE_LARGE, (curl_off_t) uploadData->length());
|
curl_easy_setopt(req.get(), CURLOPT_INFILESIZE_LARGE, (curl_off_t) uploadData->length());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (settings.caFile != "")
|
if (settings.caFile != "")
|
||||||
curl_easy_setopt(req, CURLOPT_CAINFO, settings.caFile.get().c_str());
|
curl_easy_setopt(req.get(), CURLOPT_CAINFO, settings.caFile.get().c_str());
|
||||||
|
|
||||||
curl_easy_setopt(req, CURLOPT_CONNECTTIMEOUT, fileTransferSettings.connectTimeout.get());
|
curl_easy_setopt(req.get(), CURLOPT_CONNECTTIMEOUT, fileTransferSettings.connectTimeout.get());
|
||||||
|
|
||||||
curl_easy_setopt(req, CURLOPT_LOW_SPEED_LIMIT, 1L);
|
curl_easy_setopt(req.get(), CURLOPT_LOW_SPEED_LIMIT, 1L);
|
||||||
curl_easy_setopt(req, CURLOPT_LOW_SPEED_TIME, fileTransferSettings.stalledDownloadTimeout.get());
|
curl_easy_setopt(req.get(), CURLOPT_LOW_SPEED_TIME, fileTransferSettings.stalledDownloadTimeout.get());
|
||||||
|
|
||||||
/* If no file exist in the specified path, curl continues to work
|
/* If no file exist in the specified path, curl continues to work
|
||||||
anyway as if netrc support was disabled. */
|
anyway as if netrc support was disabled. */
|
||||||
curl_easy_setopt(req, CURLOPT_NETRC_FILE, settings.netrcFile.get().c_str());
|
curl_easy_setopt(req.get(), CURLOPT_NETRC_FILE, settings.netrcFile.get().c_str());
|
||||||
curl_easy_setopt(req, CURLOPT_NETRC, CURL_NETRC_OPTIONAL);
|
curl_easy_setopt(req.get(), CURLOPT_NETRC, CURL_NETRC_OPTIONAL);
|
||||||
|
|
||||||
if (writtenToSink)
|
if (writtenToSink)
|
||||||
curl_easy_setopt(req, CURLOPT_RESUME_FROM_LARGE, writtenToSink);
|
curl_easy_setopt(req.get(), CURLOPT_RESUME_FROM_LARGE, writtenToSink);
|
||||||
}
|
|
||||||
|
|
||||||
~TransferItem()
|
|
||||||
{
|
|
||||||
curl_easy_cleanup(req);
|
|
||||||
if (requestHeaders) curl_slist_free_all(requestHeaders);
|
|
||||||
try {
|
|
||||||
if (phase != transferComplete)
|
|
||||||
fail(FileTransferError(Interrupted, {}, "download of '%s' was interrupted", uri));
|
|
||||||
} catch (...) {
|
|
||||||
ignoreExceptionInDestructor();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool acceptsRanges()
|
bool acceptsRanges()
|
||||||
{
|
{
|
||||||
curl_header * h;
|
curl_header * h;
|
||||||
if (curl_easy_header(req, "accept-ranges", 0, CURLH_HEADER, -1, &h)) {
|
if (curl_easy_header(req.get(), "accept-ranges", 0, CURLH_HEADER, -1, &h)) {
|
||||||
// treat any error as the remote not accepting range requests. the only
|
// treat any error as the remote not accepting range requests. the only
|
||||||
// interesting local error is out-of-memory, which we can't even handle
|
// interesting local error is out-of-memory, which we can't even handle
|
||||||
return false;
|
return false;
|
||||||
|
@ -220,7 +217,7 @@ struct curlFileTransfer : public FileTransfer
|
||||||
}
|
}
|
||||||
|
|
||||||
char * effectiveUriCStr = nullptr;
|
char * effectiveUriCStr = nullptr;
|
||||||
curl_easy_getinfo(req, CURLINFO_EFFECTIVE_URL, &effectiveUriCStr);
|
curl_easy_getinfo(req.get(), CURLINFO_EFFECTIVE_URL, &effectiveUriCStr);
|
||||||
if (effectiveUriCStr) {
|
if (effectiveUriCStr) {
|
||||||
result.effectiveUri = effectiveUriCStr;
|
result.effectiveUri = effectiveUriCStr;
|
||||||
}
|
}
|
||||||
|
@ -456,15 +453,18 @@ struct curlFileTransfer : public FileTransfer
|
||||||
std::thread workerThread;
|
std::thread workerThread;
|
||||||
|
|
||||||
curlFileTransfer(unsigned int baseRetryTimeMs)
|
curlFileTransfer(unsigned int baseRetryTimeMs)
|
||||||
: baseRetryTimeMs(baseRetryTimeMs)
|
: curlm(curl_multi_init())
|
||||||
|
, baseRetryTimeMs(baseRetryTimeMs)
|
||||||
{
|
{
|
||||||
|
if (curlm == nullptr) {
|
||||||
|
throw FileTransferError(Misc, {}, "could not allocate curl handle");
|
||||||
|
}
|
||||||
|
|
||||||
static std::once_flag globalInit;
|
static std::once_flag globalInit;
|
||||||
std::call_once(globalInit, curl_global_init, CURL_GLOBAL_ALL);
|
std::call_once(globalInit, curl_global_init, CURL_GLOBAL_ALL);
|
||||||
|
|
||||||
curlm = curl_multi_init();
|
curl_multi_setopt(curlm.get(), CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX);
|
||||||
|
curl_multi_setopt(curlm.get(), CURLMOPT_MAX_TOTAL_CONNECTIONS,
|
||||||
curl_multi_setopt(curlm, CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX);
|
|
||||||
curl_multi_setopt(curlm, CURLMOPT_MAX_TOTAL_CONNECTIONS,
|
|
||||||
fileTransferSettings.httpConnections.get());
|
fileTransferSettings.httpConnections.get());
|
||||||
|
|
||||||
workerThread = std::thread([&]() { workerThreadEntry(); });
|
workerThread = std::thread([&]() { workerThreadEntry(); });
|
||||||
|
@ -484,13 +484,11 @@ struct curlFileTransfer : public FileTransfer
|
||||||
std::terminate();
|
std::terminate();
|
||||||
}
|
}
|
||||||
workerThread.join();
|
workerThread.join();
|
||||||
|
|
||||||
if (curlm) curl_multi_cleanup(curlm);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void wakeup()
|
void wakeup()
|
||||||
{
|
{
|
||||||
if (auto mc = curl_multi_wakeup(curlm))
|
if (auto mc = curl_multi_wakeup(curlm.get()))
|
||||||
throw nix::Error("unexpected error from curl_multi_wakeup(): %s", curl_multi_strerror(mc));
|
throw nix::Error("unexpected error from curl_multi_wakeup(): %s", curl_multi_strerror(mc));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -527,32 +525,32 @@ struct curlFileTransfer : public FileTransfer
|
||||||
{
|
{
|
||||||
auto cancel = [&] { return std::move(state_.lock()->cancel); }();
|
auto cancel = [&] { return std::move(state_.lock()->cancel); }();
|
||||||
for (auto & [item, promise] : cancel) {
|
for (auto & [item, promise] : cancel) {
|
||||||
curl_multi_remove_handle(curlm, item->req);
|
curl_multi_remove_handle(curlm.get(), item->req.get());
|
||||||
promise.set_value();
|
promise.set_value();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Let curl do its thing. */
|
/* Let curl do its thing. */
|
||||||
int running;
|
int running;
|
||||||
CURLMcode mc = curl_multi_perform(curlm, &running);
|
CURLMcode mc = curl_multi_perform(curlm.get(), &running);
|
||||||
if (mc != CURLM_OK)
|
if (mc != CURLM_OK)
|
||||||
throw nix::Error("unexpected error from curl_multi_perform(): %s", curl_multi_strerror(mc));
|
throw nix::Error("unexpected error from curl_multi_perform(): %s", curl_multi_strerror(mc));
|
||||||
|
|
||||||
/* Set the promises of any finished requests. */
|
/* Set the promises of any finished requests. */
|
||||||
CURLMsg * msg;
|
CURLMsg * msg;
|
||||||
int left;
|
int left;
|
||||||
while ((msg = curl_multi_info_read(curlm, &left))) {
|
while ((msg = curl_multi_info_read(curlm.get(), &left))) {
|
||||||
if (msg->msg == CURLMSG_DONE) {
|
if (msg->msg == CURLMSG_DONE) {
|
||||||
auto i = items.find(msg->easy_handle);
|
auto i = items.find(msg->easy_handle);
|
||||||
assert(i != items.end());
|
assert(i != items.end());
|
||||||
i->second->finish(msg->data.result);
|
i->second->finish(msg->data.result);
|
||||||
curl_multi_remove_handle(curlm, i->second->req);
|
curl_multi_remove_handle(curlm.get(), i->second->req.get());
|
||||||
items.erase(i);
|
items.erase(i);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Wait for activity, including wakeup events. */
|
/* Wait for activity, including wakeup events. */
|
||||||
mc = curl_multi_poll(curlm, nullptr, 0, std::min<int64_t>(timeoutMs, INT_MAX), nullptr);
|
mc = curl_multi_poll(curlm.get(), nullptr, 0, std::min<int64_t>(timeoutMs, INT_MAX), nullptr);
|
||||||
if (mc != CURLM_OK)
|
if (mc != CURLM_OK)
|
||||||
throw nix::Error("unexpected error from curl_multi_poll(): %s", curl_multi_strerror(mc));
|
throw nix::Error("unexpected error from curl_multi_poll(): %s", curl_multi_strerror(mc));
|
||||||
|
|
||||||
|
@ -567,7 +565,7 @@ struct curlFileTransfer : public FileTransfer
|
||||||
{
|
{
|
||||||
auto unpause = [&] { return std::move(state_.lock()->unpause); }();
|
auto unpause = [&] { return std::move(state_.lock()->unpause); }();
|
||||||
for (auto & item : unpause) {
|
for (auto & item : unpause) {
|
||||||
curl_easy_pause(item->req, CURLPAUSE_CONT);
|
curl_easy_pause(item->req.get(), CURLPAUSE_CONT);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -579,8 +577,8 @@ struct curlFileTransfer : public FileTransfer
|
||||||
|
|
||||||
for (auto & item : incoming) {
|
for (auto & item : incoming) {
|
||||||
debug("starting %s of %s", item->verb(), item->uri);
|
debug("starting %s of %s", item->verb(), item->uri);
|
||||||
curl_multi_add_handle(curlm, item->req);
|
curl_multi_add_handle(curlm.get(), item->req.get());
|
||||||
items[item->req] = item;
|
items[item->req.get()] = item;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue