libstore: pause only stalling transfers

don't pause the entire curl thread. we have multiple consumer threads
after all, not just one, so stalling all of them is likely not great.

note that libcurl advises against using transfer pauses if compressed
encodings are allowed and automatically decoded. this should not lead
to problems in practice because our data is usually not compressed to
such a degree that curl buffering *uncompressed* data matters. should
this cause problems we can reintroduce the whole-thread pause, but we
will probably get away with this until the entire file transfer class
is made kj::Promise-using async (and *then* curl can be hardpaused if
it cannot get rid of its data, solving the problem once and for all).

Change-Id: I218e41bfa5a27c7454eafb0bdb54f2a29a7f6493
This commit is contained in:
eldritch horrors 2024-10-29 20:33:42 +01:00
parent c95b73d8a1
commit 4ae6fb5a8f
2 changed files with 119 additions and 28 deletions

View file

@ -64,7 +64,8 @@ struct curlFileTransfer : public FileTransfer
} phase = initialSetup; } phase = initialSetup;
std::promise<FileTransferResult> metadataPromise; std::promise<FileTransferResult> metadataPromise;
std::packaged_task<void(std::exception_ptr)> doneCallback; std::packaged_task<void(std::exception_ptr)> doneCallback;
std::function<void(std::string_view data)> dataCallback; // return false from dataCallback to pause the transfer without consuming data
std::function<bool(std::string_view data)> dataCallback;
CURL * req; // must never be nullptr CURL * req; // must never be nullptr
std::string statusMsg; std::string statusMsg;
@ -104,7 +105,7 @@ struct curlFileTransfer : public FileTransfer
const Headers & headers, const Headers & headers,
ActivityId parentAct, ActivityId parentAct,
std::invocable<std::exception_ptr> auto doneCallback, std::invocable<std::exception_ptr> auto doneCallback,
std::function<void(std::string_view data)> dataCallback, std::function<bool(std::string_view data)> dataCallback,
std::optional<std::string> uploadData, std::optional<std::string> uploadData,
bool noBody bool noBody
) )
@ -198,15 +199,16 @@ struct curlFileTransfer : public FileTransfer
try { try {
maybeFinishSetup(); maybeFinishSetup();
bodySize += realSize;
if (successfulStatuses.count(getHTTPStatus()) && this->dataCallback) { if (successfulStatuses.count(getHTTPStatus()) && this->dataCallback) {
if (!dataCallback({static_cast<const char *>(contents), realSize})) {
return CURL_WRITEFUNC_PAUSE;
}
writtenToSink += realSize; writtenToSink += realSize;
dataCallback({static_cast<const char *>(contents), realSize});
} else { } else {
this->downloadData.append(static_cast<const char *>(contents), realSize); this->downloadData.append(static_cast<const char *>(contents), realSize);
} }
bodySize += realSize;
return realSize; return realSize;
} catch (...) { } catch (...) {
callbackException = std::current_exception(); callbackException = std::current_exception();
@ -501,6 +503,13 @@ struct curlFileTransfer : public FileTransfer
fail(std::move(exc)); fail(std::move(exc));
} }
} }
void unpause()
{
auto lock = fileTransfer.state_.lock();
lock->unpause.push_back(shared_from_this());
fileTransfer.wakeup();
}
}; };
struct State struct State
@ -512,6 +521,7 @@ struct curlFileTransfer : public FileTransfer
}; };
bool quit = false; bool quit = false;
std::priority_queue<std::shared_ptr<TransferItem>, std::vector<std::shared_ptr<TransferItem>>, EmbargoComparator> incoming; std::priority_queue<std::shared_ptr<TransferItem>, std::vector<std::shared_ptr<TransferItem>>, EmbargoComparator> incoming;
std::vector<std::shared_ptr<TransferItem>> unpause;
}; };
Sync<State> state_; Sync<State> state_;
@ -627,6 +637,10 @@ struct curlFileTransfer : public FileTransfer
{ {
auto state(state_.lock()); auto state(state_.lock());
for (auto & item : state->unpause) {
curl_easy_pause(item->req, CURLPAUSE_CONT);
}
state->unpause.clear();
while (!state->incoming.empty()) { while (!state->incoming.empty()) {
auto item = state->incoming.top(); auto item = state->incoming.top();
if (item->embargo <= now) { if (item->embargo <= now) {
@ -826,14 +840,14 @@ struct curlFileTransfer : public FileTransfer
download thread. (Hopefully sleeping will throttle the download thread. (Hopefully sleeping will throttle the
sender.) */ sender.) */
if (state->data.size() > 1024 * 1024) { if (state->data.size() > 1024 * 1024) {
debug("download buffer is full; going to sleep"); return false;
state.wait_for(state->request, std::chrono::seconds(10));
} }
/* Append data to the buffer and wake up the calling /* Append data to the buffer and wake up the calling
thread. */ thread. */
state->data.append(data); state->data.append(data);
state->avail.notify_one(); state->avail.notify_one();
return true;
}, },
std::move(data), std::move(data),
noBody noBody
@ -842,10 +856,17 @@ struct curlFileTransfer : public FileTransfer
struct TransferSource : Source struct TransferSource : Source
{ {
const std::shared_ptr<Sync<State>> _state; const std::shared_ptr<Sync<State>> _state;
std::shared_ptr<TransferItem> transfer;
std::string chunk; std::string chunk;
std::string_view buffered; std::string_view buffered;
explicit TransferSource(const std::shared_ptr<Sync<State>> & state) : _state(state) {} explicit TransferSource(
const std::shared_ptr<Sync<State>> & state, std::shared_ptr<TransferItem> transfer
)
: _state(state)
, transfer(std::move(transfer))
{
}
~TransferSource() ~TransferSource()
{ {
@ -868,6 +889,7 @@ struct curlFileTransfer : public FileTransfer
return; return;
} }
transfer->unpause();
state.wait(state->avail); state.wait(state->avail);
} }
@ -910,7 +932,7 @@ struct curlFileTransfer : public FileTransfer
}; };
auto metadata = item->metadataPromise.get_future().get(); auto metadata = item->metadataPromise.get_future().get();
auto source = make_box_ptr<TransferSource>(_state); auto source = make_box_ptr<TransferSource>(_state, item);
auto lock(_state->lock()); auto lock(_state->lock());
source->awaitData(lock); source->awaitData(lock);
return {std::move(metadata), std::move(source)}; return {std::move(metadata), std::move(source)};

View file

@ -27,9 +27,28 @@ namespace {
struct Reply { struct Reply {
std::string status, headers; std::string status, headers;
std::function<std::string()> content; std::function<std::optional<std::string>(int)> content;
};
Reply(
std::string_view status, std::string_view headers, std::function<std::string()> content
)
: Reply(status, headers, [content](int round) {
return round == 0 ? std::optional(content()) : std::nullopt;
})
{
}
Reply(
std::string_view status,
std::string_view headers,
std::function<std::optional<std::string>(int)> content
)
: status(status)
, headers(headers)
, content(content)
{
}
};
} }
namespace nix { namespace nix {
@ -89,25 +108,44 @@ serveHTTP(std::vector<Reply> replies)
throw SysError(errno, "accept() failed"); throw SysError(errno, "accept() failed");
} }
auto send = [&](std::string_view bit) {
while (!bit.empty()) {
auto written = ::write(conn.get(), bit.data(), bit.size());
if (written < 0) {
throw SysError(errno, "write() failed");
}
bit.remove_prefix(written);
}
};
const auto & reply = replies[at++ % replies.size()]; const auto & reply = replies[at++ % replies.size()];
send("HTTP/1.1 "); std::thread([=, conn{std::move(conn)}] {
send(reply.status); auto send = [&](std::string_view bit) {
send("\r\n"); while (!bit.empty()) {
send(reply.headers); auto written = ::write(conn.get(), bit.data(), bit.size());
send("\r\n"); if (written < 0) {
send(reply.content()); throw SysError(errno, "write() failed");
::shutdown(conn.get(), SHUT_RDWR); }
bit.remove_prefix(written);
}
};
send("HTTP/1.1 ");
send(reply.status);
send("\r\n");
send(reply.headers);
send("\r\n");
for (int round = 0; ; round++) {
if (auto content = reply.content(round); content.has_value()) {
send(*content);
} else {
break;
}
}
::shutdown(conn.get(), SHUT_WR);
for (;;) {
char buf[1];
switch (read(conn.get(), buf, 1)) {
case 0:
return; // remote closed
case 1:
continue; // connection still held open by remote
default:
throw SysError(errno, "read() failed");
}
}
}).detach();
} }
}, },
std::move(listener), std::move(listener),
@ -219,4 +257,35 @@ TEST(FileTransfer, usesIntermediateLinkHeaders)
ASSERT_EQ(result.immutableUrl, "http://foo"); ASSERT_EQ(result.immutableUrl, "http://foo");
} }
TEST(FileTransfer, stalledReaderDoesntBlockOthers)
{
auto [port, srv] = serveHTTP({
{"200 ok",
"content-length: 100000000\r\n",
[](int round) mutable {
return round < 100 ? std::optional(std::string(1'000'000, ' ')) : std::nullopt;
}},
});
auto ft = makeFileTransfer(0);
auto [_result1, data1] = ft->download(fmt("http://[::1]:%d", port));
auto [_result2, data2] = ft->download(fmt("http://[::1]:%d", port));
auto drop = [](Source & source, size_t size) {
char buf[1000];
while (size > 0) {
auto round = std::min(size, sizeof(buf));
source(buf, round);
size -= round;
}
};
// read 10M of each of the 100M, then the rest. neither reader should
// block the other, nor should it take that long to copy 200MB total.
drop(*data1, 10'000'000);
drop(*data2, 10'000'000);
drop(*data1, 90'000'000);
drop(*data2, 90'000'000);
ASSERT_THROW(drop(*data1, 1), EndOfFile);
ASSERT_THROW(drop(*data2, 1), EndOfFile);
}
} }