filetransfer: make two-arg download abstract

this lets each implementation of FileTransfer (of which currently only
the one exists at all) implement appropriate handling for its internal
behaviours that are not otherwise exposed. in curl this lets us switch
the buffer-full handling method from "block the entire curl thread" to
"pause just the one transfer", move the non-libcurl body decompression
out of the actual curl wrapper (which will let us eventually morph the
curl wrapper intto an actual source of Sources), and some other things

Change-Id: Id6d3593cde6b4915aab3e90a43b175c103cc3f18
This commit is contained in:
eldritch horrors 2024-04-25 16:04:49 +02:00
parent ce76d3eab2
commit dfe3baea12
2 changed files with 100 additions and 100 deletions

View file

@ -696,6 +696,105 @@ struct curlFileTransfer : public FileTransfer
enqueueItem(std::make_shared<TransferItem>(*this, request, std::move(callback)));
}
void download(FileTransferRequest && request, Sink & sink) override
{
/* Note: we can't call 'sink' via request.dataCallback, because
that would cause the sink to execute on the fileTransfer
thread. If 'sink' is a coroutine, this will fail. Also, if the
sink is expensive (e.g. one that does decompression and writing
to the Nix store), it would stall the download thread too much.
Therefore we use a buffer to communicate data between the
download thread and the calling thread. */
struct State {
bool quit = false;
std::exception_ptr exc;
std::string data;
std::condition_variable avail, request;
};
auto _state = std::make_shared<Sync<State>>();
/* In case of an exception, wake up the download thread. FIXME:
abort the download request. */
Finally finally([&]() {
auto state(_state->lock());
state->quit = true;
state->request.notify_one();
});
request.dataCallback = [_state](std::string_view data) {
auto state(_state->lock());
if (state->quit) return;
/* If the buffer is full, then go to sleep until the calling
thread wakes us up (i.e. when it has removed data from the
buffer). We don't wait forever to prevent stalling the
download thread. (Hopefully sleeping will throttle the
sender.) */
if (state->data.size() > 1024 * 1024) {
debug("download buffer is full; going to sleep");
state.wait_for(state->request, std::chrono::seconds(10));
}
/* Append data to the buffer and wake up the calling
thread. */
state->data.append(data);
state->avail.notify_one();
};
enqueueFileTransfer(request,
{[_state](std::future<FileTransferResult> fut) {
auto state(_state->lock());
state->quit = true;
try {
fut.get();
} catch (...) {
state->exc = std::current_exception();
}
state->avail.notify_one();
state->request.notify_one();
}});
while (true) {
checkInterrupt();
std::string chunk;
/* Grab data if available, otherwise wait for the download
thread to wake us up. */
{
auto state(_state->lock());
if (state->data.empty()) {
if (state->quit) {
if (state->exc) std::rethrow_exception(state->exc);
return;
}
state.wait(state->avail);
if (state->data.empty()) continue;
}
chunk = std::move(state->data);
/* Reset state->data after the move, since we check data.empty() */
state->data = "";
state->request.notify_one();
}
/* Flush the data to the sink and wake up the download thread
if it's blocked on a full buffer. We don't hold the state
lock while doing this to prevent blocking the download
thread if sink() takes a long time. */
sink(chunk);
}
}
};
ref<curlFileTransfer> makeCurlFileTransfer()
@ -743,105 +842,6 @@ FileTransferResult FileTransfer::upload(const FileTransferRequest & request)
return enqueueFileTransfer(request).get();
}
void FileTransfer::download(FileTransferRequest && request, Sink & sink)
{
/* Note: we can't call 'sink' via request.dataCallback, because
that would cause the sink to execute on the fileTransfer
thread. If 'sink' is a coroutine, this will fail. Also, if the
sink is expensive (e.g. one that does decompression and writing
to the Nix store), it would stall the download thread too much.
Therefore we use a buffer to communicate data between the
download thread and the calling thread. */
struct State {
bool quit = false;
std::exception_ptr exc;
std::string data;
std::condition_variable avail, request;
};
auto _state = std::make_shared<Sync<State>>();
/* In case of an exception, wake up the download thread. FIXME:
abort the download request. */
Finally finally([&]() {
auto state(_state->lock());
state->quit = true;
state->request.notify_one();
});
request.dataCallback = [_state](std::string_view data) {
auto state(_state->lock());
if (state->quit) return;
/* If the buffer is full, then go to sleep until the calling
thread wakes us up (i.e. when it has removed data from the
buffer). We don't wait forever to prevent stalling the
download thread. (Hopefully sleeping will throttle the
sender.) */
if (state->data.size() > 1024 * 1024) {
debug("download buffer is full; going to sleep");
state.wait_for(state->request, std::chrono::seconds(10));
}
/* Append data to the buffer and wake up the calling
thread. */
state->data.append(data);
state->avail.notify_one();
};
enqueueFileTransfer(request,
{[_state](std::future<FileTransferResult> fut) {
auto state(_state->lock());
state->quit = true;
try {
fut.get();
} catch (...) {
state->exc = std::current_exception();
}
state->avail.notify_one();
state->request.notify_one();
}});
while (true) {
checkInterrupt();
std::string chunk;
/* Grab data if available, otherwise wait for the download
thread to wake us up. */
{
auto state(_state->lock());
if (state->data.empty()) {
if (state->quit) {
if (state->exc) std::rethrow_exception(state->exc);
return;
}
state.wait(state->avail);
if (state->data.empty()) continue;
}
chunk = std::move(state->data);
/* Reset state->data after the move, since we check data.empty() */
state->data = "";
state->request.notify_one();
}
/* Flush the data to the sink and wake up the download thread
if it's blocked on a full buffer. We don't hold the state
lock while doing this to prevent blocking the download
thread if sink() takes a long time. */
sink(chunk);
}
}
template<typename... Args>
FileTransferError::FileTransferError(FileTransfer::Error error, std::optional<std::string> response, const Args & ... args)
: Error(args...), error(error), response(response)

View file

@ -115,7 +115,7 @@ struct FileTransfer
* Download a file, writing its data to a sink. The sink will be
* invoked on the thread of the caller.
*/
void download(FileTransferRequest && request, Sink & sink);
virtual void download(FileTransferRequest && request, Sink & sink) = 0;
enum Error { NotFound, Forbidden, Misc, Transient, Interrupted };
};