Move FramedSource and FramedSink, extract withFramedSink

This commit is contained in:
Robert Hensing 2020-09-17 17:36:16 +02:00
parent dfa547c6a8
commit 14b30b3f3d
3 changed files with 127 additions and 118 deletions

View file

@ -748,59 +748,12 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
info.ultimate = false; info.ultimate = false;
if (GET_PROTOCOL_MINOR(clientVersion) >= 23) { if (GET_PROTOCOL_MINOR(clientVersion) >= 23) {
struct FramedSource : Source
{
Source & from;
bool eof = false;
std::vector<unsigned char> pending;
size_t pos = 0;
FramedSource(Source & from) : from(from)
{ }
~FramedSource()
{
if (!eof) {
while (true) {
auto n = readInt(from);
if (!n) break;
std::vector<unsigned char> data(n);
from(data.data(), n);
}
}
}
size_t read(unsigned char * data, size_t len) override
{
if (eof) throw EndOfFile("reached end of FramedSource");
if (pos >= pending.size()) {
size_t len = readInt(from);
if (!len) {
eof = true;
return 0;
}
pending = std::vector<unsigned char>(len);
pos = 0;
from(pending.data(), len);
}
auto n = std::min(len, pending.size() - pos);
memcpy(data, pending.data() + pos, n);
pos += n;
return n;
}
};
logger->startWork(); logger->startWork();
{ {
FramedSource source(from); FramedSource source(from);
store->addToStore(info, source, (RepairFlag) repair, store->addToStore(info, source, (RepairFlag) repair,
dontCheckSigs ? NoCheckSigs : CheckSigs); dontCheckSigs ? NoCheckSigs : CheckSigs);
} }
logger->stopWork(); logger->stopWork();
} }

View file

@ -306,6 +306,8 @@ struct ConnectionHandle
std::rethrow_exception(ex); std::rethrow_exception(ex);
} }
} }
void withFramedSink(std::function<void(Sink & sink)> fun);
}; };
@ -564,78 +566,9 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
<< repair << !checkSigs; << repair << !checkSigs;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 23) { if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 23) {
conn.withFramedSink([&](Sink & sink) {
conn->to.flush();
std::exception_ptr ex;
struct FramedSink : BufferedSink
{
ConnectionHandle & conn;
std::exception_ptr & ex;
FramedSink(ConnectionHandle & conn, std::exception_ptr & ex) : conn(conn), ex(ex)
{ }
~FramedSink()
{
try {
conn->to << 0;
conn->to.flush();
} catch (...) {
ignoreException();
}
}
void write(const unsigned char * data, size_t len) override
{
/* Don't send more data if the remote has
encountered an error. */
if (ex) {
auto ex2 = ex;
ex = nullptr;
std::rethrow_exception(ex2);
}
conn->to << len;
conn->to(data, len);
};
};
/* Handle log messages / exceptions from the remote on a
separate thread. */
std::thread stderrThread([&]()
{
try {
conn.processStderr(nullptr, nullptr, false);
} catch (...) {
ex = std::current_exception();
}
});
Finally joinStderrThread([&]()
{
if (stderrThread.joinable()) {
stderrThread.join();
if (ex) {
try {
std::rethrow_exception(ex);
} catch (...) {
ignoreException();
}
}
}
});
{
FramedSink sink(conn, ex);
copyNAR(source, sink); copyNAR(source, sink);
sink.flush(); });
}
stderrThread.join();
if (ex)
std::rethrow_exception(ex);
} else if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 21) { } else if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 21) {
conn.processStderr(0, &source); conn.processStderr(0, &source);
} else { } else {
@ -992,6 +925,82 @@ std::exception_ptr RemoteStore::Connection::processStderr(Sink * sink, Source *
return nullptr; return nullptr;
} }
struct FramedSink : nix::BufferedSink
{
ConnectionHandle & conn;
std::exception_ptr & ex;
FramedSink(ConnectionHandle & conn, std::exception_ptr & ex) : conn(conn), ex(ex)
{ }
~FramedSink()
{
try {
conn->to << 0;
conn->to.flush();
} catch (...) {
ignoreException();
}
}
void write(const unsigned char * data, size_t len) override
{
/* Don't send more data if the remote has
encountered an error. */
if (ex) {
auto ex2 = ex;
ex = nullptr;
std::rethrow_exception(ex2);
}
conn->to << len;
conn->to(data, len);
};
};
void
ConnectionHandle::withFramedSink(std::function<void(Sink &sink)> fun) {
(*this)->to.flush();
std::exception_ptr ex;
/* Handle log messages / exceptions from the remote on a
separate thread. */
std::thread stderrThread([&]()
{
try {
processStderr(nullptr, nullptr, false);
} catch (...) {
ex = std::current_exception();
}
});
Finally joinStderrThread([&]()
{
if (stderrThread.joinable()) {
stderrThread.join();
if (ex) {
try {
std::rethrow_exception(ex);
} catch (...) {
ignoreException();
}
}
}
});
{
FramedSink sink(*this, ex);
fun(sink);
sink.flush();
}
stderrThread.join();
if (ex)
std::rethrow_exception(ex);
}
static RegisterStoreImplementation<UDSRemoteStore, UDSRemoteStoreConfig> regStore; static RegisterStoreImplementation<UDSRemoteStore, UDSRemoteStoreConfig> regStore;
} }

View file

@ -406,4 +406,51 @@ struct StreamToSourceAdapter : Source
}; };
/* Like SizedSource, but guarantees that the whole frame is consumed after
destruction. This ensures that the original stream is in a known state. */
struct FramedSource : Source
{
Source & from;
bool eof = false;
std::vector<unsigned char> pending;
size_t pos = 0;
FramedSource(Source & from) : from(from)
{ }
~FramedSource()
{
if (!eof) {
while (true) {
auto n = readInt(from);
if (!n) break;
std::vector<unsigned char> data(n);
from(data.data(), n);
}
}
}
size_t read(unsigned char * data, size_t len) override
{
if (eof) throw EndOfFile("reached end of FramedSource");
if (pos >= pending.size()) {
size_t len = readInt(from);
if (!len) {
eof = true;
return 0;
}
pending = std::vector<unsigned char>(len);
pos = 0;
from(pending.data(), len);
}
auto n = std::min(len, pending.size() - pos);
memcpy(data, pending.data() + pos, n);
pos += n;
return n;
}
};
} }