#include "lix/libstore/filetransfer.hh" #include "lix/libutil/compression.hh" #include #include #include #include #include #include #include #include #include #include #include // local server tests don't work on darwin without some incantations // the horrors do not want to look up. contributions welcome though! #if __APPLE__ #define NOT_ON_DARWIN(n) DISABLED_##n #else #define NOT_ON_DARWIN(n) n #endif using namespace std::chrono_literals; namespace { struct Reply { std::string status, headers; std::function(int)> content; std::list expectedHeaders; Reply( std::string_view status, std::string_view headers, std::function content, std::list expectedHeaders = {} ) : Reply( status, headers, [content](int round) { return round == 0 ? std::optional(content()) : std::nullopt; }, std::move(expectedHeaders) ) { } Reply( std::string_view status, std::string_view headers, std::function(int)> content, std::list expectedHeaders = {} ) : status(status) , headers(headers) , content(content) , expectedHeaders(std::move(expectedHeaders)) { } }; } namespace nix { static std::tuple serveHTTP(std::vector replies) { AutoCloseFD listener(::socket(AF_INET6, SOCK_STREAM, 0)); if (!listener) { throw SysError(errno, "socket() failed"); } Pipe trigger; trigger.create(); sockaddr_in6 addr = { .sin6_family = AF_INET6, .sin6_addr = IN6ADDR_LOOPBACK_INIT, }; socklen_t len = sizeof(addr); if (::bind(listener.get(), reinterpret_cast(&addr), sizeof(addr)) < 0) { throw SysError(errno, "bind() failed"); } if (::getsockname(listener.get(), reinterpret_cast(&addr), &len) < 0) { throw SysError(errno, "getsockname() failed"); } if (::listen(listener.get(), 1) < 0) { throw SysError(errno, "listen() failed"); } std::thread( [replies, at{0}](AutoCloseFD socket, AutoCloseFD trigger) mutable { while (true) { pollfd pfds[2] = { { .fd = socket.get(), .events = POLLIN, }, { .fd = trigger.get(), .events = POLLHUP, }, }; if (::poll(pfds, 2, -1) <= 0) { throw SysError(errno, "poll() failed"); } if (pfds[1].revents & POLLHUP) { return; } if (!(pfds[0].revents & POLLIN)) { continue; } AutoCloseFD conn(::accept(socket.get(), nullptr, nullptr)); if (!conn) { throw SysError(errno, "accept() failed"); } const auto & reply = replies[at++ % replies.size()]; std::thread([=, conn{std::move(conn)}] { auto send = [&](std::string_view bit) { while (!bit.empty()) { auto written = ::send(conn.get(), bit.data(), bit.size(), MSG_NOSIGNAL); if (written < 0) { debug("send() failed: %s", strerror(errno)); return; } bit.remove_prefix(written); } }; send("HTTP/1.1 "); send(reply.status); send("\r\n"); std::string requestWithHeaders; while (true) { char c; if (recv(conn.get(), &c, 1, MSG_NOSIGNAL) != 1) { debug("recv() failed for headers: %s", strerror(errno)); return; } requestWithHeaders += c; if (requestWithHeaders.ends_with("\r\n\r\n")) { requestWithHeaders.resize(requestWithHeaders.size() - 2); break; } } debug("got request:\n%s", requestWithHeaders); for (auto & expected : reply.expectedHeaders) { ASSERT_TRUE(requestWithHeaders.contains(fmt("%s\r\n", expected))); } send(reply.headers); send("\r\n"); for (int round = 0; ; round++) { if (auto content = reply.content(round); content.has_value()) { send(*content); } else { break; } } ::shutdown(conn.get(), SHUT_WR); for (;;) { char buf[1]; switch (recv(conn.get(), buf, 1, MSG_NOSIGNAL)) { case 0: return; // remote closed case 1: continue; // connection still held open by remote default: debug("recv() failed: %s", strerror(errno)); return; } } }).detach(); } }, std::move(listener), std::move(trigger.readSide) ) .detach(); return { ntohs(addr.sin6_port), std::move(trigger.writeSide), }; } static std::tuple serveHTTP(std::string status, std::string headers, std::function content) { return serveHTTP({{{status, headers, content}}}); } TEST(FileTransfer, exceptionAbortsDownload) { struct Done : std::exception {}; auto ft = makeFileTransfer(); LambdaSink broken([](auto block) { throw Done(); }); auto [port, srv] = serveHTTP({{"200 ok", "", [](int) { return "foo"; }}}); ASSERT_THROW(ft->download(fmt("http://[::1]:%d/index", port)).second->drainInto(broken), Done); // makeFileTransfer returns a ref<>, which cannot be cleared. since we also // can't default-construct it we'll have to overwrite it instead, but we'll // take the raw pointer out first so we can destroy it in a detached thread // (otherwise a failure will stall the process and have it killed by meson) auto reset = std::async(std::launch::async, [&]() { ft = makeFileTransfer(); }); EXPECT_EQ(reset.wait_for(10s), std::future_status::ready); // if this did time out we have to leak `reset`. if (reset.wait_for(0s) == std::future_status::timeout) { (void) new auto(std::move(reset)); } } TEST(FileTransfer, exceptionAbortsRead) { auto [port, srv] = serveHTTP("200 ok", "content-length: 0\r\n", [] { return ""; }); auto ft = makeFileTransfer(); char buf[10] = ""; ASSERT_THROW(ft->download(fmt("http://[::1]:%d/index", port)).second->read(buf, 10), EndOfFile); } TEST(FileTransfer, NOT_ON_DARWIN(reportsSetupErrors)) { auto [port, srv] = serveHTTP("404 not found", "", [] { return ""; }); auto ft = makeFileTransfer(); ASSERT_THROW( ft->download(fmt("http://[::1]:%d/index", port)), FileTransferError ); } TEST(FileTransfer, NOT_ON_DARWIN(defersFailures)) { auto [port, srv] = serveHTTP("200 ok", "content-length: 100000000\r\n", [] { std::this_thread::sleep_for(10ms); // just a bunch of data to fill the curl wrapper buffer, otherwise the // initial wait for header data will also wait for the the response to // complete (the source is only woken when curl returns data, and curl // might only do so once its internal buffer has already been filled.) return std::string(1024 * 1024, ' '); }); auto ft = makeFileTransfer(0); auto src = ft->download(fmt("http://[::1]:%d/index", port)).second; ASSERT_THROW(src->drain(), FileTransferError); } TEST(FileTransfer, NOT_ON_DARWIN(handlesContentEncoding)) { std::string original = "Test data string"; std::string compressed = compress("gzip", original); auto [port, srv] = serveHTTP("200 ok", "content-encoding: gzip\r\n", [&] { return compressed; }); auto ft = makeFileTransfer(); StringSink sink; ft->download(fmt("http://[::1]:%d/index", port)).second->drainInto(sink); EXPECT_EQ(sink.s, original); } TEST(FileTransfer, usesIntermediateLinkHeaders) { auto [port, srv] = serveHTTP({ {"301 ok", "location: /second\r\n" "content-length: 0\r\n", [] { return ""; }}, {"307 ok", "location: /third\r\n" "content-length: 0\r\n", [] { return ""; }}, {"307 ok", "location: /fourth\r\n" "link: ; rel=\"immutable\"\r\n" "content-length: 0\r\n", [] { return ""; }}, {"200 ok", "content-length: 1\r\n", [] { return "a"; }}, }); auto ft = makeFileTransfer(0); auto [result, _data] = ft->download(fmt("http://[::1]:%d/first", port)); ASSERT_EQ(result.immutableUrl, "http://foo"); } TEST(FileTransfer, stalledReaderDoesntBlockOthers) { auto [port, srv] = serveHTTP({ {"200 ok", "content-length: 100000000\r\n", [](int round) mutable { return round < 100 ? std::optional(std::string(1'000'000, ' ')) : std::nullopt; }}, }); auto ft = makeFileTransfer(0); auto [_result1, data1] = ft->download(fmt("http://[::1]:%d", port)); auto [_result2, data2] = ft->download(fmt("http://[::1]:%d", port)); auto drop = [](Source & source, size_t size) { char buf[1000]; while (size > 0) { auto round = std::min(size, sizeof(buf)); source(buf, round); size -= round; } }; // read 10M of each of the 100M, then the rest. neither reader should // block the other, nor should it take that long to copy 200MB total. drop(*data1, 10'000'000); drop(*data2, 10'000'000); drop(*data1, 90'000'000); drop(*data2, 90'000'000); ASSERT_THROW(drop(*data1, 1), EndOfFile); ASSERT_THROW(drop(*data2, 1), EndOfFile); } TEST(FileTransfer, retries) { auto [port, srv] = serveHTTP({ // transient setup failure {"429 try again later", "content-length: 0\r\n", [] { return ""; }}, // transient transfer failure (simulates a connection break) {"200 ok", "content-length: 2\r\n" "accept-ranges: bytes\r\n", [] { return "a"; }}, // wrapper should ask for remaining data now {"200 ok", "content-length: 1\r\n" "content-range: bytes 1-1/2\r\n", [] { return "b"; }, {"Range: bytes=1-"}}, }); auto ft = makeFileTransfer(0); auto [result, data] = ft->download(fmt("http://[::1]:%d", port)); ASSERT_EQ(data->drain(), "ab"); } TEST(FileTransfer, doesntRetrySetupForever) { auto [port, srv] = serveHTTP({ {"429 try again later", "content-length: 0\r\n", [] { return ""; }}, }); auto ft = makeFileTransfer(0); ASSERT_THROW(ft->download(fmt("http://[::1]:%d", port)), FileTransferError); } TEST(FileTransfer, doesntRetryTransferForever) { constexpr size_t LIMIT = 20; ASSERT_LT(fileTransferSettings.tries, LIMIT); // just to keep test runtime low std::vector replies; for (size_t i = 0; i < LIMIT; i++) { replies.emplace_back( "200 ok", fmt("content-length: %1%\r\n" "accept-ranges: bytes\r\n" "content-range: bytes %2%-%3%/%3%\r\n", LIMIT - i, i, LIMIT), [] { return "a"; } ); } auto [port, srv] = serveHTTP(replies); auto ft = makeFileTransfer(0); ASSERT_THROW(ft->download(fmt("http://[::1]:%d", port)).second->drain(), FileTransferError); } TEST(FileTransfer, doesntRetryUploads) { auto ft = makeFileTransfer(0); { auto [port, srv] = serveHTTP({ {"429 try again later", "", [] { return ""; }}, {"200 ok", "", [] { return ""; }}, }); ASSERT_THROW(ft->upload(fmt("http://[::1]:%d", port), ""), FileTransferError); } { auto [port, srv] = serveHTTP({ {"429 try again later", "", [] { return ""; }}, {"200 ok", "", [] { return ""; }}, }); ASSERT_THROW(ft->upload(fmt("http://[::1]:%d", port), "foo"), FileTransferError); } } }