Move FramedSink next to FramedSource

This commit is contained in:
Robert Hensing 2020-09-17 22:01:35 +02:00
parent ecc8088cb7
commit 8279178b07
2 changed files with 45 additions and 36 deletions

View file

@ -957,39 +957,6 @@ std::exception_ptr RemoteStore::Connection::processStderr(Sink * sink, Source *
return nullptr; return nullptr;
} }
struct FramedSink : nix::BufferedSink
{
ConnectionHandle & conn;
std::exception_ptr & ex;
FramedSink(ConnectionHandle & conn, std::exception_ptr & ex) : conn(conn), ex(ex)
{ }
~FramedSink()
{
try {
conn->to << 0;
conn->to.flush();
} catch (...) {
ignoreException();
}
}
void write(const unsigned char * data, size_t len) override
{
/* Don't send more data if the remote has
encountered an error. */
if (ex) {
auto ex2 = ex;
ex = nullptr;
std::rethrow_exception(ex2);
}
conn->to << len;
conn->to(data, len);
};
};
void void
ConnectionHandle::withFramedSink(std::function<void(Sink &sink)> fun) { ConnectionHandle::withFramedSink(std::function<void(Sink &sink)> fun) {
(*this)->to.flush(); (*this)->to.flush();
@ -1022,7 +989,7 @@ ConnectionHandle::withFramedSink(std::function<void(Sink &sink)> fun) {
}); });
{ {
FramedSink sink(*this, ex); FramedSink sink((*this)->to, ex);
fun(sink); fun(sink);
sink.flush(); sink.flush();
} }

View file

@ -406,8 +406,13 @@ struct StreamToSourceAdapter : Source
}; };
/* Like SizedSource, but guarantees that the whole frame is consumed after /* A source that reads a distinct format of concatenated chunks back into its
destruction. This ensures that the original stream is in a known state. */ logical form, in order to guarantee a known state to the original stream,
even in the event of errors.
Use with FramedSink, which also allows the logical stream to be terminated
in the event of an exception.
*/
struct FramedSource : Source struct FramedSource : Source
{ {
Source & from; Source & from;
@ -452,5 +457,42 @@ struct FramedSource : Source
} }
}; };
/* Write as chunks in the format expected by FramedSource.
The exception_ptr reference can be used to terminate the stream when you
detect that an error has occurred on the remote end.
*/
struct FramedSink : nix::BufferedSink
{
BufferedSink & to;
std::exception_ptr & ex;
FramedSink(BufferedSink & to, std::exception_ptr & ex) : to(to), ex(ex)
{ }
~FramedSink()
{
try {
to << 0;
to.flush();
} catch (...) {
ignoreException();
}
}
void write(const unsigned char * data, size_t len) override
{
/* Don't send more data if the remote has
encountered an error. */
if (ex) {
auto ex2 = ex;
ex = nullptr;
std::rethrow_exception(ex2);
}
to << len;
to(data, len);
};
};
} }