diff --git a/src/libstore/daemon.cc b/src/libstore/daemon.cc index 2e068992b..010d3bc2d 100644 --- a/src/libstore/daemon.cc +++ b/src/libstore/daemon.cc @@ -748,59 +748,12 @@ static void performOp(TunnelLogger * logger, ref store, info.ultimate = false; if (GET_PROTOCOL_MINOR(clientVersion) >= 23) { - - struct FramedSource : Source - { - Source & from; - bool eof = false; - std::vector pending; - size_t pos = 0; - - FramedSource(Source & from) : from(from) - { } - - ~FramedSource() - { - if (!eof) { - while (true) { - auto n = readInt(from); - if (!n) break; - std::vector data(n); - from(data.data(), n); - } - } - } - - size_t read(unsigned char * data, size_t len) override - { - if (eof) throw EndOfFile("reached end of FramedSource"); - - if (pos >= pending.size()) { - size_t len = readInt(from); - if (!len) { - eof = true; - return 0; - } - pending = std::vector(len); - pos = 0; - from(pending.data(), len); - } - - auto n = std::min(len, pending.size() - pos); - memcpy(data, pending.data() + pos, n); - pos += n; - return n; - } - }; - logger->startWork(); - { FramedSource source(from); store->addToStore(info, source, (RepairFlag) repair, dontCheckSigs ? NoCheckSigs : CheckSigs); } - logger->stopWork(); } diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc index e92b94975..993b05dc0 100644 --- a/src/libstore/remote-store.cc +++ b/src/libstore/remote-store.cc @@ -306,6 +306,8 @@ struct ConnectionHandle std::rethrow_exception(ex); } } + + void withFramedSink(std::function fun); }; @@ -564,78 +566,9 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source, << repair << !checkSigs; if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 23) { - - conn->to.flush(); - - std::exception_ptr ex; - - struct FramedSink : BufferedSink - { - ConnectionHandle & conn; - std::exception_ptr & ex; - - FramedSink(ConnectionHandle & conn, std::exception_ptr & ex) : conn(conn), ex(ex) - { } - - ~FramedSink() - { - try { - conn->to << 0; - conn->to.flush(); - } catch (...) { - ignoreException(); - } - } - - void write(const unsigned char * data, size_t len) override - { - /* Don't send more data if the remote has - encountered an error. */ - if (ex) { - auto ex2 = ex; - ex = nullptr; - std::rethrow_exception(ex2); - } - conn->to << len; - conn->to(data, len); - }; - }; - - /* Handle log messages / exceptions from the remote on a - separate thread. */ - std::thread stderrThread([&]() - { - try { - conn.processStderr(nullptr, nullptr, false); - } catch (...) { - ex = std::current_exception(); - } - }); - - Finally joinStderrThread([&]() - { - if (stderrThread.joinable()) { - stderrThread.join(); - if (ex) { - try { - std::rethrow_exception(ex); - } catch (...) { - ignoreException(); - } - } - } - }); - - { - FramedSink sink(conn, ex); + conn.withFramedSink([&](Sink & sink) { copyNAR(source, sink); - sink.flush(); - } - - stderrThread.join(); - if (ex) - std::rethrow_exception(ex); - + }); } else if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 21) { conn.processStderr(0, &source); } else { @@ -992,6 +925,82 @@ std::exception_ptr RemoteStore::Connection::processStderr(Sink * sink, Source * return nullptr; } + +struct FramedSink : nix::BufferedSink +{ + ConnectionHandle & conn; + std::exception_ptr & ex; + + FramedSink(ConnectionHandle & conn, std::exception_ptr & ex) : conn(conn), ex(ex) + { } + + ~FramedSink() + { + try { + conn->to << 0; + conn->to.flush(); + } catch (...) { + ignoreException(); + } + } + + void write(const unsigned char * data, size_t len) override + { + /* Don't send more data if the remote has + encountered an error. */ + if (ex) { + auto ex2 = ex; + ex = nullptr; + std::rethrow_exception(ex2); + } + conn->to << len; + conn->to(data, len); + }; +}; + +void +ConnectionHandle::withFramedSink(std::function fun) { + (*this)->to.flush(); + + std::exception_ptr ex; + + /* Handle log messages / exceptions from the remote on a + separate thread. */ + std::thread stderrThread([&]() + { + try { + processStderr(nullptr, nullptr, false); + } catch (...) { + ex = std::current_exception(); + } + }); + + Finally joinStderrThread([&]() + { + if (stderrThread.joinable()) { + stderrThread.join(); + if (ex) { + try { + std::rethrow_exception(ex); + } catch (...) { + ignoreException(); + } + } + } + }); + + { + FramedSink sink(*this, ex); + fun(sink); + sink.flush(); + } + + stderrThread.join(); + if (ex) + std::rethrow_exception(ex); + +} + static RegisterStoreImplementation regStore; } diff --git a/src/libutil/serialise.hh b/src/libutil/serialise.hh index 2fd06bb4d..6027d5961 100644 --- a/src/libutil/serialise.hh +++ b/src/libutil/serialise.hh @@ -406,4 +406,51 @@ struct StreamToSourceAdapter : Source }; +/* Like SizedSource, but guarantees that the whole frame is consumed after + destruction. This ensures that the original stream is in a known state. */ +struct FramedSource : Source +{ + Source & from; + bool eof = false; + std::vector pending; + size_t pos = 0; + + FramedSource(Source & from) : from(from) + { } + + ~FramedSource() + { + if (!eof) { + while (true) { + auto n = readInt(from); + if (!n) break; + std::vector data(n); + from(data.data(), n); + } + } + } + + size_t read(unsigned char * data, size_t len) override + { + if (eof) throw EndOfFile("reached end of FramedSource"); + + if (pos >= pending.size()) { + size_t len = readInt(from); + if (!len) { + eof = true; + return 0; + } + pending = std::vector(len); + pos = 0; + from(pending.data(), len); + } + + auto n = std::min(len, pending.size() - pos); + memcpy(data, pending.data() + pos, n); + pos += n; + return n; + } +}; + + }