forked from lix-project/lix
filetransfer: return a Source from download()
without this we will not be able to get rid of makeDecompressionSink,
which in turn will be necessary to get rid of sourceToSink (since the
libarchive archive wrapper *must* be a Source due to api limitations)
Change-Id: Iccd3d333ba2cbcab49cb5a1d3125624de16bce27
This commit is contained in:
parent
11f4a5bc7e
commit
c55dcc6c13
|
@ -41,7 +41,7 @@ void builtinFetchurl(const BasicDerivation & drv, const std::string & netrcData)
|
||||||
|
|
||||||
auto decompressor = makeDecompressionSink(
|
auto decompressor = makeDecompressionSink(
|
||||||
unpack && mainUrl.ends_with(".xz") ? "xz" : "none", sink);
|
unpack && mainUrl.ends_with(".xz") ? "xz" : "none", sink);
|
||||||
fileTransfer->download(std::move(request), *decompressor);
|
fileTransfer->download(std::move(request))->drainInto(*decompressor);
|
||||||
decompressor->finish();
|
decompressor->finish();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
@ -686,16 +686,8 @@ struct curlFileTransfer : public FileTransfer
|
||||||
->callback.get_future();
|
->callback.get_future();
|
||||||
}
|
}
|
||||||
|
|
||||||
void download(FileTransferRequest && request, Sink & sink) override
|
box_ptr<Source> download(FileTransferRequest && request) override
|
||||||
{
|
{
|
||||||
/* Note: we can't call 'sink' via request.dataCallback, because
|
|
||||||
that would cause the sink to execute on the fileTransfer
|
|
||||||
thread. If 'sink' is a coroutine, this will fail. Also, if the
|
|
||||||
sink is expensive (e.g. one that does decompression and writing
|
|
||||||
to the Nix store), it would stall the download thread too much.
|
|
||||||
Therefore we use a buffer to communicate data between the
|
|
||||||
download thread and the calling thread. */
|
|
||||||
|
|
||||||
struct State {
|
struct State {
|
||||||
bool done = false, failed = false;
|
bool done = false, failed = false;
|
||||||
std::exception_ptr exc;
|
std::exception_ptr exc;
|
||||||
|
@ -705,13 +697,6 @@ struct curlFileTransfer : public FileTransfer
|
||||||
|
|
||||||
auto _state = std::make_shared<Sync<State>>();
|
auto _state = std::make_shared<Sync<State>>();
|
||||||
|
|
||||||
/* In case of an exception, wake up the download thread. */
|
|
||||||
Finally finally([&]() {
|
|
||||||
auto state(_state->lock());
|
|
||||||
state->failed |= std::uncaught_exceptions() != 0;
|
|
||||||
state->request.notify_one();
|
|
||||||
});
|
|
||||||
|
|
||||||
enqueueFileTransfer(
|
enqueueFileTransfer(
|
||||||
request,
|
request,
|
||||||
[_state](std::exception_ptr ex) {
|
[_state](std::exception_ptr ex) {
|
||||||
|
@ -750,50 +735,99 @@ struct curlFileTransfer : public FileTransfer
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
std::unique_ptr<FinishSink> decompressor;
|
struct InnerSource : Source
|
||||||
|
{
|
||||||
while (true) {
|
const std::shared_ptr<Sync<State>> _state;
|
||||||
checkInterrupt();
|
|
||||||
|
|
||||||
std::string chunk;
|
std::string chunk;
|
||||||
|
std::string_view buffered;
|
||||||
|
|
||||||
|
explicit InnerSource(const std::shared_ptr<Sync<State>> & state) : _state(state) {}
|
||||||
|
|
||||||
|
~InnerSource()
|
||||||
|
{
|
||||||
|
// wake up the download thread if it's still going and have it abort
|
||||||
|
auto state(_state->lock());
|
||||||
|
state->failed |= !state->done;
|
||||||
|
state->request.notify_one();
|
||||||
|
}
|
||||||
|
|
||||||
|
void awaitData(Sync<State>::Lock & state)
|
||||||
|
{
|
||||||
/* Grab data if available, otherwise wait for the download
|
/* Grab data if available, otherwise wait for the download
|
||||||
thread to wake us up. */
|
thread to wake us up. */
|
||||||
{
|
while (buffered.empty()) {
|
||||||
auto state(_state->lock());
|
|
||||||
|
|
||||||
if (state->data.empty()) {
|
if (state->data.empty()) {
|
||||||
|
|
||||||
if (state->done) {
|
if (state->done) {
|
||||||
if (state->exc) std::rethrow_exception(state->exc);
|
if (state->exc) {
|
||||||
if (decompressor) {
|
std::rethrow_exception(state->exc);
|
||||||
decompressor->finish();
|
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
state.wait(state->avail);
|
state.wait(state->avail);
|
||||||
|
|
||||||
if (state->data.empty()) continue;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
chunk = std::move(state->data);
|
chunk = std::move(state->data);
|
||||||
/* Reset state->data after the move, since we check data.empty() */
|
buffered = chunk;
|
||||||
state->data = "";
|
|
||||||
|
|
||||||
if (!decompressor) {
|
|
||||||
decompressor = makeDecompressionSink(state->encoding, sink);
|
|
||||||
}
|
|
||||||
|
|
||||||
state->request.notify_one();
|
state->request.notify_one();
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Flush the data to the sink and wake up the download thread
|
|
||||||
if it's blocked on a full buffer. We don't hold the state
|
|
||||||
lock while doing this to prevent blocking the download
|
|
||||||
thread if sink() takes a long time. */
|
|
||||||
(*decompressor)(chunk);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
size_t read(char * data, size_t len) override
|
||||||
|
{
|
||||||
|
auto readPartial = [this](char * data, size_t len) {
|
||||||
|
const auto available = std::min(len, buffered.size());
|
||||||
|
memcpy(data, buffered.data(), available);
|
||||||
|
buffered.remove_prefix(available);
|
||||||
|
return available;
|
||||||
|
};
|
||||||
|
size_t total = readPartial(data, len);
|
||||||
|
|
||||||
|
while (total < len) {
|
||||||
|
{
|
||||||
|
auto state(_state->lock());
|
||||||
|
awaitData(state);
|
||||||
|
}
|
||||||
|
const auto current = readPartial(data + total, len - total);
|
||||||
|
total += current;
|
||||||
|
if (total == 0 || current == 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (total == 0) {
|
||||||
|
throw EndOfFile("download finished");
|
||||||
|
}
|
||||||
|
|
||||||
|
return total;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
struct DownloadSource : Source
|
||||||
|
{
|
||||||
|
InnerSource inner;
|
||||||
|
std::unique_ptr<Source> decompressor;
|
||||||
|
|
||||||
|
explicit DownloadSource(const std::shared_ptr<Sync<State>> & state) : inner(state) {}
|
||||||
|
|
||||||
|
size_t read(char * data, size_t len) override
|
||||||
|
{
|
||||||
|
checkInterrupt();
|
||||||
|
|
||||||
|
if (!decompressor) {
|
||||||
|
auto state(inner._state->lock());
|
||||||
|
inner.awaitData(state);
|
||||||
|
decompressor = makeDecompressionSource(state->encoding, inner);
|
||||||
|
}
|
||||||
|
|
||||||
|
return decompressor->read(data, len);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
auto source = make_box_ptr<DownloadSource>(_state);
|
||||||
|
auto lock(_state->lock());
|
||||||
|
source->inner.awaitData(lock);
|
||||||
|
return source;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
#pragma once
|
#pragma once
|
||||||
///@file
|
///@file
|
||||||
|
|
||||||
|
#include "box_ptr.hh"
|
||||||
#include "logging.hh"
|
#include "logging.hh"
|
||||||
#include "serialise.hh"
|
#include "serialise.hh"
|
||||||
#include "types.hh"
|
#include "types.hh"
|
||||||
|
@ -104,10 +105,13 @@ struct FileTransfer
|
||||||
FileTransferResult transfer(const FileTransferRequest & request);
|
FileTransferResult transfer(const FileTransferRequest & request);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Download a file, writing its data to a sink. The sink will be
|
* Download a file, returning its contents through a source. Will not return
|
||||||
* invoked on the thread of the caller.
|
* before the transfer has fully started, ensuring that any errors thrown by
|
||||||
|
* the setup phase (e.g. HTTP 404 or similar errors) are not postponed to be
|
||||||
|
* thrown by the returned source. The source will only throw errors detected
|
||||||
|
* during the transfer itself (decompression errors, connection drops, etc).
|
||||||
*/
|
*/
|
||||||
virtual void download(FileTransferRequest && request, Sink & sink) = 0;
|
virtual box_ptr<Source> download(FileTransferRequest && request) = 0;
|
||||||
|
|
||||||
enum Error { NotFound, Forbidden, Misc, Transient, Interrupted };
|
enum Error { NotFound, Forbidden, Misc, Transient, Interrupted };
|
||||||
};
|
};
|
||||||
|
|
|
@ -155,7 +155,7 @@ protected:
|
||||||
checkEnabled();
|
checkEnabled();
|
||||||
auto request(makeRequest(path));
|
auto request(makeRequest(path));
|
||||||
try {
|
try {
|
||||||
getFileTransfer()->download(std::move(request), sink);
|
getFileTransfer()->download(std::move(request))->drainInto(sink);
|
||||||
} catch (FileTransferError & e) {
|
} catch (FileTransferError & e) {
|
||||||
if (e.error == FileTransfer::NotFound || e.error == FileTransfer::Forbidden)
|
if (e.error == FileTransfer::NotFound || e.error == FileTransfer::Forbidden)
|
||||||
throw NoSuchBinaryCacheFile("file '%s' does not exist in binary cache '%s'", path, getUri());
|
throw NoSuchBinaryCacheFile("file '%s' does not exist in binary cache '%s'", path, getUri());
|
||||||
|
|
|
@ -98,7 +98,7 @@ std::tuple<StorePath, Hash> prefetchFile(
|
||||||
FdSink sink(fd.get());
|
FdSink sink(fd.get());
|
||||||
|
|
||||||
FileTransferRequest req(url);
|
FileTransferRequest req(url);
|
||||||
getFileTransfer()->download(std::move(req), sink);
|
getFileTransfer()->download(std::move(req))->drainInto(sink);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Optionally unpack the file. */
|
/* Optionally unpack the file. */
|
||||||
|
|
|
@ -7,6 +7,7 @@
|
||||||
#include <gtest/gtest.h>
|
#include <gtest/gtest.h>
|
||||||
#include <netinet/in.h>
|
#include <netinet/in.h>
|
||||||
#include <stdexcept>
|
#include <stdexcept>
|
||||||
|
#include <string>
|
||||||
#include <string_view>
|
#include <string_view>
|
||||||
#include <sys/poll.h>
|
#include <sys/poll.h>
|
||||||
#include <sys/socket.h>
|
#include <sys/socket.h>
|
||||||
|
@ -136,7 +137,7 @@ TEST(FileTransfer, exceptionAbortsDownload)
|
||||||
|
|
||||||
LambdaSink broken([](auto block) { throw Done(); });
|
LambdaSink broken([](auto block) { throw Done(); });
|
||||||
|
|
||||||
ASSERT_THROW(ft->download(FileTransferRequest("file:///dev/zero"), broken), Done);
|
ASSERT_THROW(ft->download(FileTransferRequest("file:///dev/zero"))->drainInto(broken), Done);
|
||||||
|
|
||||||
// makeFileTransfer returns a ref<>, which cannot be cleared. since we also
|
// makeFileTransfer returns a ref<>, which cannot be cleared. since we also
|
||||||
// can't default-construct it we'll have to overwrite it instead, but we'll
|
// can't default-construct it we'll have to overwrite it instead, but we'll
|
||||||
|
@ -159,16 +160,21 @@ TEST(FileTransfer, NOT_ON_DARWIN(reportsSetupErrors))
|
||||||
FileTransferError);
|
FileTransferError);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(FileTransfer, NOT_ON_DARWIN(reportsTransferError))
|
TEST(FileTransfer, NOT_ON_DARWIN(defersFailures))
|
||||||
{
|
{
|
||||||
auto [port, srv] = serveHTTP("200 ok", "content-length: 100\r\n", [] {
|
auto [port, srv] = serveHTTP("200 ok", "content-length: 100000000\r\n", [] {
|
||||||
std::this_thread::sleep_for(10ms);
|
std::this_thread::sleep_for(10ms);
|
||||||
return "";
|
// just a bunch of data to fill the curl wrapper buffer, otherwise the
|
||||||
|
// initial wait for header data will also wait for the the response to
|
||||||
|
// complete (the source is only woken when curl returns data, and curl
|
||||||
|
// might only do so once its internal buffer has already been filled.)
|
||||||
|
return std::string(1024 * 1024, ' ');
|
||||||
});
|
});
|
||||||
auto ft = makeFileTransfer();
|
auto ft = makeFileTransfer();
|
||||||
FileTransferRequest req(fmt("http://[::1]:%d/index", port));
|
FileTransferRequest req(fmt("http://[::1]:%d/index", port));
|
||||||
req.baseRetryTimeMs = 0;
|
req.baseRetryTimeMs = 0;
|
||||||
ASSERT_THROW(ft->transfer(req), FileTransferError);
|
auto src = ft->download(std::move(req));
|
||||||
|
ASSERT_THROW(src->drain(), FileTransferError);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(FileTransfer, NOT_ON_DARWIN(handlesContentEncoding))
|
TEST(FileTransfer, NOT_ON_DARWIN(handlesContentEncoding))
|
||||||
|
@ -180,7 +186,7 @@ TEST(FileTransfer, NOT_ON_DARWIN(handlesContentEncoding))
|
||||||
auto ft = makeFileTransfer();
|
auto ft = makeFileTransfer();
|
||||||
|
|
||||||
StringSink sink;
|
StringSink sink;
|
||||||
ft->download(FileTransferRequest(fmt("http://[::1]:%d/index", port)), sink);
|
ft->download(FileTransferRequest(fmt("http://[::1]:%d/index", port)))->drainInto(sink);
|
||||||
EXPECT_EQ(sink.s, original);
|
EXPECT_EQ(sink.s, original);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue