libstore: use curl_multi_{poll,wakeup}

the previous solution to the wakeup problem (adding a pipe and passing
it as an additional fd to curl_multi_wait) worked, but there have been
builtin alternatives for this since 2020. not only do these save code,
they're also a lot more likely to work natively on windows when needed

Change-Id: Iab751b900997110a8d15de45ea3ab0c42f7e5973
This commit is contained in:
eldritch horrors 2024-04-22 20:54:02 +02:00
parent e5903aab65
commit ff9a4fc336

View file

@ -504,11 +504,6 @@ struct curlFileTransfer : public FileTransfer
Sync<State> state_; Sync<State> state_;
/* We can't use a std::condition_variable to wake up the curl
thread, because it only monitors file descriptors. So use a
pipe instead. */
Pipe wakeupPipe;
std::thread workerThread; std::thread workerThread;
curlFileTransfer() curlFileTransfer()
@ -523,9 +518,6 @@ struct curlFileTransfer : public FileTransfer
curl_multi_setopt(curlm, CURLMOPT_MAX_TOTAL_CONNECTIONS, curl_multi_setopt(curlm, CURLMOPT_MAX_TOTAL_CONNECTIONS,
fileTransferSettings.httpConnections.get()); fileTransferSettings.httpConnections.get());
wakeupPipe.create();
fcntl(wakeupPipe.readSide.get(), F_SETFL, O_NONBLOCK);
workerThread = std::thread([&]() { workerThreadEntry(); }); workerThread = std::thread([&]() { workerThreadEntry(); });
} }
@ -538,6 +530,12 @@ struct curlFileTransfer : public FileTransfer
if (curlm) curl_multi_cleanup(curlm); if (curlm) curl_multi_cleanup(curlm);
} }
void wakeup()
{
if (auto mc = curl_multi_wakeup(curlm))
throw nix::Error("unexpected error from curl_multi_wakeup(): %s", curl_multi_strerror(mc));
}
void stopWorkerThread() void stopWorkerThread()
{ {
/* Signal the worker thread to exit. */ /* Signal the worker thread to exit. */
@ -545,7 +543,7 @@ struct curlFileTransfer : public FileTransfer
auto state(state_.lock()); auto state(state_.lock());
state->quit = true; state->quit = true;
} }
writeFull(wakeupPipe.writeSide.get(), " ", false); wakeup();
} }
void workerThreadMain() void workerThreadMain()
@ -587,32 +585,21 @@ struct curlFileTransfer : public FileTransfer
} }
/* Wait for activity, including wakeup events. */ /* Wait for activity, including wakeup events. */
int numfds = 0;
struct curl_waitfd extraFDs[1];
extraFDs[0].fd = wakeupPipe.readSide.get();
extraFDs[0].events = CURL_WAIT_POLLIN;
extraFDs[0].revents = 0;
long maxSleepTimeMs = items.empty() ? 10000 : 100; long maxSleepTimeMs = items.empty() ? 10000 : 100;
auto sleepTimeMs = auto sleepTimeMs =
nextWakeup != std::chrono::steady_clock::time_point() nextWakeup != std::chrono::steady_clock::time_point()
? std::max(0, (int) std::chrono::duration_cast<std::chrono::milliseconds>(nextWakeup - std::chrono::steady_clock::now()).count()) ? std::max(0, (int) std::chrono::duration_cast<std::chrono::milliseconds>(nextWakeup - std::chrono::steady_clock::now()).count())
: maxSleepTimeMs; : maxSleepTimeMs;
vomit("download thread waiting for %d ms", sleepTimeMs); vomit("download thread waiting for %d ms", sleepTimeMs);
mc = curl_multi_wait(curlm, extraFDs, 1, sleepTimeMs, &numfds); mc = curl_multi_poll(curlm, nullptr, 0, sleepTimeMs, nullptr);
if (mc != CURLM_OK) if (mc != CURLM_OK)
throw nix::Error("unexpected error from curl_multi_wait(): %s", curl_multi_strerror(mc)); throw nix::Error("unexpected error from curl_multi_poll(): %s", curl_multi_strerror(mc));
nextWakeup = std::chrono::steady_clock::time_point(); nextWakeup = std::chrono::steady_clock::time_point();
/* Add new curl requests from the incoming requests queue, /* Add new curl requests from the incoming requests queue,
except for requests that are embargoed (waiting for a except for requests that are embargoed (waiting for a
retry timeout to expire). */ retry timeout to expire). */
if (extraFDs[0].revents & CURL_WAIT_POLLIN) {
char buf[1024];
auto res = read(extraFDs[0].fd, buf, sizeof(buf));
if (res == -1 && errno != EINTR)
throw SysError("reading curl wakeup socket");
}
std::vector<std::shared_ptr<TransferItem>> incoming; std::vector<std::shared_ptr<TransferItem>> incoming;
auto now = std::chrono::steady_clock::now(); auto now = std::chrono::steady_clock::now();
@ -675,7 +662,7 @@ struct curlFileTransfer : public FileTransfer
throw nix::Error("cannot enqueue download request because the download thread is shutting down"); throw nix::Error("cannot enqueue download request because the download thread is shutting down");
state->incoming.push(item); state->incoming.push(item);
} }
writeFull(wakeupPipe.writeSide.get(), " "); wakeup();
} }
#if ENABLE_S3 #if ENABLE_S3