forked from lix-project/lix
filetransfer: abort transfer on receiver exception
not doing this will cause transfers that had their readers disappear to
linger. with lingering transfers the curl thread can't shut down, which
will cause nix itself to not shut down until the transfer finishes some
other way (most likely network timeouts). also add a new test for this.
Change-Id: Id2401b3ac85731c824db05918d4079125be25b57
This commit is contained in:
parent
03a20ef1ff
commit
6b08138929
3 changed files with 42 additions and 7 deletions
|
@ -708,7 +708,7 @@ struct curlFileTransfer : public FileTransfer
|
||||||
download thread and the calling thread. */
|
download thread and the calling thread. */
|
||||||
|
|
||||||
struct State {
|
struct State {
|
||||||
bool quit = false;
|
bool done = false, failed = false;
|
||||||
std::exception_ptr exc;
|
std::exception_ptr exc;
|
||||||
std::string data;
|
std::string data;
|
||||||
std::condition_variable avail, request;
|
std::condition_variable avail, request;
|
||||||
|
@ -717,18 +717,17 @@ struct curlFileTransfer : public FileTransfer
|
||||||
|
|
||||||
auto _state = std::make_shared<Sync<State>>();
|
auto _state = std::make_shared<Sync<State>>();
|
||||||
|
|
||||||
/* In case of an exception, wake up the download thread. FIXME:
|
/* In case of an exception, wake up the download thread. */
|
||||||
abort the download request. */
|
|
||||||
Finally finally([&]() {
|
Finally finally([&]() {
|
||||||
auto state(_state->lock());
|
auto state(_state->lock());
|
||||||
state->quit = true;
|
state->failed |= std::uncaught_exceptions() != 0;
|
||||||
state->request.notify_one();
|
state->request.notify_one();
|
||||||
});
|
});
|
||||||
|
|
||||||
enqueueFileTransfer(request,
|
enqueueFileTransfer(request,
|
||||||
{[_state](std::future<FileTransferResult> fut) {
|
{[_state](std::future<FileTransferResult> fut) {
|
||||||
auto state(_state->lock());
|
auto state(_state->lock());
|
||||||
state->quit = true;
|
state->done = true;
|
||||||
try {
|
try {
|
||||||
fut.get();
|
fut.get();
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
|
@ -740,7 +739,10 @@ struct curlFileTransfer : public FileTransfer
|
||||||
[_state, &sink](TransferItem & transfer, std::string_view data) {
|
[_state, &sink](TransferItem & transfer, std::string_view data) {
|
||||||
auto state(_state->lock());
|
auto state(_state->lock());
|
||||||
|
|
||||||
if (state->quit) return;
|
if (state->failed) {
|
||||||
|
// actual exception doesn't matter, the other end is already dead
|
||||||
|
throw std::exception{};
|
||||||
|
}
|
||||||
|
|
||||||
if (!state->decompressor) {
|
if (!state->decompressor) {
|
||||||
state->decompressor = makeDecompressionSink(transfer.encoding, sink);
|
state->decompressor = makeDecompressionSink(transfer.encoding, sink);
|
||||||
|
@ -775,7 +777,7 @@ struct curlFileTransfer : public FileTransfer
|
||||||
|
|
||||||
if (state->data.empty()) {
|
if (state->data.empty()) {
|
||||||
|
|
||||||
if (state->quit) {
|
if (state->done) {
|
||||||
if (state->exc) std::rethrow_exception(state->exc);
|
if (state->exc) std::rethrow_exception(state->exc);
|
||||||
if (state->decompressor) {
|
if (state->decompressor) {
|
||||||
state->decompressor->finish();
|
state->decompressor->finish();
|
||||||
|
|
32
tests/unit/libstore/filetransfer.cc
Normal file
32
tests/unit/libstore/filetransfer.cc
Normal file
|
@ -0,0 +1,32 @@
|
||||||
|
#include "filetransfer.hh"
|
||||||
|
|
||||||
|
#include <future>
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
|
||||||
|
using namespace std::chrono_literals;
|
||||||
|
|
||||||
|
namespace nix {
|
||||||
|
|
||||||
|
TEST(FileTransfer, exceptionAbortsDownload)
|
||||||
|
{
|
||||||
|
struct Done
|
||||||
|
{};
|
||||||
|
|
||||||
|
auto ft = makeFileTransfer();
|
||||||
|
|
||||||
|
LambdaSink broken([](auto block) { throw Done(); });
|
||||||
|
|
||||||
|
ASSERT_THROW(ft->download(FileTransferRequest("file:///dev/zero"), broken), Done);
|
||||||
|
|
||||||
|
// makeFileTransfer returns a ref<>, which cannot be cleared. since we also
|
||||||
|
// can't default-construct it we'll have to overwrite it instead, but we'll
|
||||||
|
// take the raw pointer out first so we can destroy it in a detached thread
|
||||||
|
// (otherwise a failure will stall the process and have it killed by meson)
|
||||||
|
auto reset = std::async(std::launch::async, [&]() { ft = makeFileTransfer(); });
|
||||||
|
EXPECT_EQ(reset.wait_for(10s), std::future_status::ready);
|
||||||
|
// if this did time out we have to leak `reset`.
|
||||||
|
if (reset.wait_for(0s) == std::future_status::timeout) {
|
||||||
|
(void) new auto(std::move(reset));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -113,6 +113,7 @@ libstore_tests_sources = files(
|
||||||
'libstore/derivation.cc',
|
'libstore/derivation.cc',
|
||||||
'libstore/derived-path.cc',
|
'libstore/derived-path.cc',
|
||||||
'libstore/downstream-placeholder.cc',
|
'libstore/downstream-placeholder.cc',
|
||||||
|
'libstore/filetransfer.cc',
|
||||||
'libstore/machines.cc',
|
'libstore/machines.cc',
|
||||||
'libstore/nar-info-disk-cache.cc',
|
'libstore/nar-info-disk-cache.cc',
|
||||||
'libstore/outputs-spec.cc',
|
'libstore/outputs-spec.cc',
|
||||||
|
|
Loading…
Reference in a new issue