Merge branch 'add-ca-to-store' of https://github.com/hercules-ci/nix

This commit is contained in:
Eelco Dolstra 2020-09-22 11:31:33 +02:00
commit 92ac8df0ec
11 changed files with 417 additions and 258 deletions

View file

@ -2950,14 +2950,6 @@ struct RestrictedStore : public LocalFSStore, public virtual RestrictedStoreConf
goal.addDependency(info.path);
}
StorePath addToStoreFromDump(Source & dump, const string & name,
FileIngestionMethod method = FileIngestionMethod::Recursive, HashType hashAlgo = htSHA256, RepairFlag repair = NoRepair) override
{
auto path = next->addToStoreFromDump(dump, name, method, hashAlgo, repair);
goal.addDependency(path);
return path;
}
StorePath addTextToStore(const string & name, const string & s,
const StorePathSet & references, RepairFlag repair = NoRepair) override
{

View file

@ -37,48 +37,86 @@ std::string renderContentAddress(ContentAddress ca) {
}, ca);
}
ContentAddress parseContentAddress(std::string_view rawCa) {
auto rest = rawCa;
std::string renderContentAddressMethod(ContentAddressMethod cam) {
return std::visit(overloaded {
[](TextHashMethod &th) {
return std::string{"text:"} + printHashType(htSHA256);
},
[](FixedOutputHashMethod &fshm) {
return "fixed:" + makeFileIngestionPrefix(fshm.fileIngestionMethod) + printHashType(fshm.hashType);
}
}, cam);
}
/*
Parses content address strings up to the hash.
*/
static ContentAddressMethod parseContentAddressMethodPrefix(std::string_view & rest) {
std::string_view wholeInput { rest };
std::string_view prefix;
{
auto optPrefix = splitPrefixTo(rest, ':');
if (!optPrefix)
throw UsageError("not a content address because it is not in the form '<prefix>:<rest>': %s", rawCa);
throw UsageError("not a content address because it is not in the form '<prefix>:<rest>': %s", wholeInput);
prefix = *optPrefix;
}
auto parseHashType_ = [&](){
auto hashTypeRaw = splitPrefixTo(rest, ':');
if (!hashTypeRaw)
throw UsageError("content address hash must be in form '<algo>:<hash>', but found: %s", rawCa);
throw UsageError("content address hash must be in form '<algo>:<hash>', but found: %s", wholeInput);
HashType hashType = parseHashType(*hashTypeRaw);
return std::move(hashType);
};
// Switch on prefix
if (prefix == "text") {
// No parsing of the method, "text" only support flat.
// No parsing of the ingestion method, "text" only support flat.
HashType hashType = parseHashType_();
if (hashType != htSHA256)
throw Error("text content address hash should use %s, but instead uses %s",
printHashType(htSHA256), printHashType(hashType));
return TextHash {
.hash = Hash::parseNonSRIUnprefixed(rest, std::move(hashType)),
};
return TextHashMethod {};
} else if (prefix == "fixed") {
// Parse method
auto method = FileIngestionMethod::Flat;
if (splitPrefix(rest, "r:"))
method = FileIngestionMethod::Recursive;
HashType hashType = parseHashType_();
return FixedOutputHash {
.method = method,
.hash = Hash::parseNonSRIUnprefixed(rest, std::move(hashType)),
return FixedOutputHashMethod {
.fileIngestionMethod = method,
.hashType = std::move(hashType),
};
} else
throw UsageError("content address prefix '%s' is unrecognized. Recogonized prefixes are 'text' or 'fixed'", prefix);
};
}
ContentAddress parseContentAddress(std::string_view rawCa) {
auto rest = rawCa;
ContentAddressMethod caMethod = parseContentAddressMethodPrefix(rest);
return std::visit(
overloaded {
[&](TextHashMethod thm) {
return ContentAddress(TextHash {
.hash = Hash::parseNonSRIUnprefixed(rest, htSHA256)
});
},
[&](FixedOutputHashMethod fohMethod) {
return ContentAddress(FixedOutputHash {
.method = fohMethod.fileIngestionMethod,
.hash = Hash::parseNonSRIUnprefixed(rest, std::move(fohMethod.hashType)),
});
},
}, caMethod);
}
ContentAddressMethod parseContentAddressMethod(std::string_view caMethod) {
std::string_view asPrefix {std::string{caMethod} + ":"};
return parseContentAddressMethodPrefix(asPrefix);
}
std::optional<ContentAddress> parseContentAddressOpt(std::string_view rawCaOpt) {
return rawCaOpt == "" ? std::optional<ContentAddress> {} : parseContentAddress(rawCaOpt);

View file

@ -55,4 +55,23 @@ std::optional<ContentAddress> parseContentAddressOpt(std::string_view rawCaOpt);
Hash getContentAddressHash(const ContentAddress & ca);
/*
We only have one way to hash text with references, so this is single-value
type is only useful in std::variant.
*/
struct TextHashMethod { };
struct FixedOutputHashMethod {
FileIngestionMethod fileIngestionMethod;
HashType hashType;
};
typedef std::variant<
TextHashMethod,
FixedOutputHashMethod
> ContentAddressMethod;
ContentAddressMethod parseContentAddressMethod(std::string_view rawCaMethod);
std::string renderContentAddressMethod(ContentAddressMethod caMethod);
}

View file

@ -2,7 +2,6 @@
#include "monitor-fd.hh"
#include "worker-protocol.hh"
#include "store-api.hh"
#include "local-store.hh"
#include "finally.hh"
#include "affinity.hh"
#include "archive.hh"
@ -240,6 +239,18 @@ struct ClientSettings
}
};
static void writeValidPathInfo(ref<Store> store, unsigned int clientVersion, Sink & to, std::shared_ptr<const ValidPathInfo> info) {
to << (info->deriver ? store->printStorePath(*info->deriver) : "")
<< info->narHash.to_string(Base16, false);
writeStorePaths(*store, to, info->references);
to << info->registrationTime << info->narSize;
if (GET_PROTOCOL_MINOR(clientVersion) >= 16) {
to << info->ultimate
<< info->sigs
<< renderContentAddress(info->ca);
}
}
static void performOp(TunnelLogger * logger, ref<Store> store,
TrustedFlag trusted, RecursiveFlag recursive, unsigned int clientVersion,
Source & from, BufferedSink & to, unsigned int op)
@ -350,47 +361,83 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
}
case wopAddToStore: {
HashType hashAlgo;
std::string baseName;
FileIngestionMethod method;
{
bool fixed;
uint8_t recursive;
std::string hashAlgoRaw;
from >> baseName >> fixed /* obsolete */ >> recursive >> hashAlgoRaw;
if (recursive > (uint8_t) FileIngestionMethod::Recursive)
throw Error("unsupported FileIngestionMethod with value of %i; you may need to upgrade nix-daemon", recursive);
method = FileIngestionMethod { recursive };
/* Compatibility hack. */
if (!fixed) {
hashAlgoRaw = "sha256";
method = FileIngestionMethod::Recursive;
if (GET_PROTOCOL_MINOR(clientVersion) >= 25) {
auto name = readString(from);
auto camStr = readString(from);
auto refs = readStorePaths<StorePathSet>(*store, from);
bool repairBool;
from >> repairBool;
auto repair = RepairFlag{repairBool};
logger->startWork();
auto pathInfo = [&]() {
// NB: FramedSource must be out of scope before logger->stopWork();
ContentAddressMethod contentAddressMethod = parseContentAddressMethod(camStr);
FramedSource source(from);
// TODO this is essentially RemoteStore::addCAToStore. Move it up to Store.
return std::visit(overloaded {
[&](TextHashMethod &_) {
// We could stream this by changing Store
std::string contents = source.drain();
auto path = store->addTextToStore(name, contents, refs, repair);
return store->queryPathInfo(path);
},
[&](FixedOutputHashMethod &fohm) {
if (!refs.empty())
throw UnimplementedError("cannot yet have refs with flat or nar-hashed data");
auto path = store->addToStoreFromDump(source, name, fohm.fileIngestionMethod, fohm.hashType, repair);
return store->queryPathInfo(path);
},
}, contentAddressMethod);
}();
logger->stopWork();
to << store->printStorePath(pathInfo->path);
writeValidPathInfo(store, clientVersion, to, pathInfo);
} else {
HashType hashAlgo;
std::string baseName;
FileIngestionMethod method;
{
bool fixed;
uint8_t recursive;
std::string hashAlgoRaw;
from >> baseName >> fixed /* obsolete */ >> recursive >> hashAlgoRaw;
if (recursive > (uint8_t) FileIngestionMethod::Recursive)
throw Error("unsupported FileIngestionMethod with value of %i; you may need to upgrade nix-daemon", recursive);
method = FileIngestionMethod { recursive };
/* Compatibility hack. */
if (!fixed) {
hashAlgoRaw = "sha256";
method = FileIngestionMethod::Recursive;
}
hashAlgo = parseHashType(hashAlgoRaw);
}
hashAlgo = parseHashType(hashAlgoRaw);
StringSink saved;
TeeSource savedNARSource(from, saved);
RetrieveRegularNARSink savedRegular { saved };
if (method == FileIngestionMethod::Recursive) {
/* Get the entire NAR dump from the client and save it to
a string so that we can pass it to
addToStoreFromDump(). */
ParseSink sink; /* null sink; just parse the NAR */
parseDump(sink, savedNARSource);
} else
parseDump(savedRegular, from);
logger->startWork();
if (!savedRegular.regular) throw Error("regular file expected");
// FIXME: try to stream directly from `from`.
StringSource dumpSource { *saved.s };
auto path = store->addToStoreFromDump(dumpSource, baseName, method, hashAlgo);
logger->stopWork();
to << store->printStorePath(path);
}
StringSink saved;
TeeSource savedNARSource(from, saved);
RetrieveRegularNARSink savedRegular { saved };
if (method == FileIngestionMethod::Recursive) {
/* Get the entire NAR dump from the client and save it to
a string so that we can pass it to
addToStoreFromDump(). */
ParseSink sink; /* null sink; just parse the NAR */
parseDump(sink, savedNARSource);
} else
parseDump(savedRegular, from);
logger->startWork();
if (!savedRegular.regular) throw Error("regular file expected");
// FIXME: try to stream directly from `from`.
StringSource dumpSource { *saved.s };
auto path = store->addToStoreFromDump(dumpSource, baseName, method, hashAlgo);
logger->stopWork();
to << store->printStorePath(path);
break;
}
@ -675,15 +722,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
if (info) {
if (GET_PROTOCOL_MINOR(clientVersion) >= 17)
to << 1;
to << (info->deriver ? store->printStorePath(*info->deriver) : "")
<< info->narHash.to_string(Base16, false);
writeStorePaths(*store, to, info->references);
to << info->registrationTime << info->narSize;
if (GET_PROTOCOL_MINOR(clientVersion) >= 16) {
to << info->ultimate
<< info->sigs
<< renderContentAddress(info->ca);
}
writeValidPathInfo(store, clientVersion, to, info);
} else {
assert(GET_PROTOCOL_MINOR(clientVersion) >= 17);
to << 0;
@ -749,59 +788,12 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
info.ultimate = false;
if (GET_PROTOCOL_MINOR(clientVersion) >= 23) {
struct FramedSource : Source
{
Source & from;
bool eof = false;
std::vector<unsigned char> pending;
size_t pos = 0;
FramedSource(Source & from) : from(from)
{ }
~FramedSource()
{
if (!eof) {
while (true) {
auto n = readInt(from);
if (!n) break;
std::vector<unsigned char> data(n);
from(data.data(), n);
}
}
}
size_t read(unsigned char * data, size_t len) override
{
if (eof) throw EndOfFile("reached end of FramedSource");
if (pos >= pending.size()) {
size_t len = readInt(from);
if (!len) {
eof = true;
return 0;
}
pending = std::vector<unsigned char>(len);
pos = 0;
from(pending.data(), len);
}
auto n = std::min(len, pending.size() - pos);
memcpy(data, pending.data() + pos, n);
pos += n;
return n;
}
};
logger->startWork();
{
FramedSource source(from);
store->addToStore(info, source, (RepairFlag) repair,
dontCheckSigs ? NoCheckSigs : CheckSigs);
}
logger->stopWork();
}

View file

@ -307,6 +307,8 @@ struct ConnectionHandle
std::rethrow_exception(ex);
}
}
void withFramedSink(std::function<void(Sink & sink)> fun);
};
@ -420,11 +422,27 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathCAMap & pathsMap, S
}
ref<const ValidPathInfo> RemoteStore::readValidPathInfo(ConnectionHandle & conn, const StorePath & path) {
auto deriver = readString(conn->from);
auto narHash = Hash::parseAny(readString(conn->from), htSHA256);
auto info = make_ref<ValidPathInfo>(path, narHash);
if (deriver != "") info->deriver = parseStorePath(deriver);
info->references = readStorePaths<StorePathSet>(*this, conn->from);
conn->from >> info->registrationTime >> info->narSize;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 16) {
conn->from >> info->ultimate;
info->sigs = readStrings<StringSet>(conn->from);
info->ca = parseContentAddressOpt(readString(conn->from));
}
return info;
}
void RemoteStore::queryPathInfoUncached(const StorePath & path,
Callback<std::shared_ptr<const ValidPathInfo>> callback) noexcept
{
try {
std::shared_ptr<ValidPathInfo> info;
std::shared_ptr<const ValidPathInfo> info;
{
auto conn(getConnection());
conn->to << wopQueryPathInfo << printStorePath(path);
@ -440,17 +458,7 @@ void RemoteStore::queryPathInfoUncached(const StorePath & path,
bool valid; conn->from >> valid;
if (!valid) throw InvalidPath("path '%s' is not valid", printStorePath(path));
}
auto deriver = readString(conn->from);
auto narHash = Hash::parseAny(readString(conn->from), htSHA256);
info = std::make_shared<ValidPathInfo>(path, narHash);
if (deriver != "") info->deriver = parseStorePath(deriver);
info->references = readStorePaths<StorePathSet>(*this, conn->from);
conn->from >> info->registrationTime >> info->narSize;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 16) {
conn->from >> info->ultimate;
info->sigs = readStrings<StringSet>(conn->from);
info->ca = parseContentAddressOpt(readString(conn->from));
}
info = readValidPathInfo(conn, path);
}
callback(std::move(info));
} catch (...) { callback.rethrow(); }
@ -525,6 +533,84 @@ std::optional<StorePath> RemoteStore::queryPathFromHashPart(const std::string &
}
ref<const ValidPathInfo> RemoteStore::addCAToStore(Source & dump, const string & name, ContentAddressMethod caMethod, StorePathSet references, RepairFlag repair)
{
auto conn(getConnection());
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 25) {
conn->to
<< wopAddToStore
<< name
<< renderContentAddressMethod(caMethod);
writeStorePaths(*this, conn->to, references);
conn->to << repair;
conn.withFramedSink([&](Sink & sink) {
dump.drainInto(sink);
});
auto path = parseStorePath(readString(conn->from));
return readValidPathInfo(conn, path);
}
else {
if (repair) throw Error("repairing is not supported when building through the Nix daemon protocol < 1.25");
std::visit(overloaded {
[&](TextHashMethod thm) -> void {
std::string s = dump.drain();
conn->to << wopAddTextToStore << name << s;
writeStorePaths(*this, conn->to, references);
conn.processStderr();
},
[&](FixedOutputHashMethod fohm) -> void {
conn->to
<< wopAddToStore
<< name
<< ((fohm.hashType == htSHA256 && fohm.fileIngestionMethod == FileIngestionMethod::Recursive) ? 0 : 1) /* backwards compatibility hack */
<< (fohm.fileIngestionMethod == FileIngestionMethod::Recursive ? 1 : 0)
<< printHashType(fohm.hashType);
try {
conn->to.written = 0;
conn->to.warn = true;
connections->incCapacity();
{
Finally cleanup([&]() { connections->decCapacity(); });
if (fohm.fileIngestionMethod == FileIngestionMethod::Recursive) {
dump.drainInto(conn->to);
} else {
std::string contents = dump.drain();
dumpString(contents, conn->to);
}
}
conn->to.warn = false;
conn.processStderr();
} catch (SysError & e) {
/* Daemon closed while we were sending the path. Probably OOM
or I/O error. */
if (e.errNo == EPIPE)
try {
conn.processStderr();
} catch (EndOfFile & e) { }
throw;
}
}
}, caMethod);
auto path = parseStorePath(readString(conn->from));
return queryPathInfo(path);
}
}
StorePath RemoteStore::addToStoreFromDump(Source & dump, const string & name,
FileIngestionMethod method, HashType hashType, RepairFlag repair)
{
StorePathSet references;
return addCAToStore(dump, name, FixedOutputHashMethod{ .fileIngestionMethod = method, .hashType = hashType }, references, repair)->path;
}
void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
RepairFlag repair, CheckSigsFlag checkSigs)
{
@ -565,78 +651,9 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
<< repair << !checkSigs;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 23) {
conn->to.flush();
std::exception_ptr ex;
struct FramedSink : BufferedSink
{
ConnectionHandle & conn;
std::exception_ptr & ex;
FramedSink(ConnectionHandle & conn, std::exception_ptr & ex) : conn(conn), ex(ex)
{ }
~FramedSink()
{
try {
conn->to << 0;
conn->to.flush();
} catch (...) {
ignoreException();
}
}
void write(const unsigned char * data, size_t len) override
{
/* Don't send more data if the remote has
encountered an error. */
if (ex) {
auto ex2 = ex;
ex = nullptr;
std::rethrow_exception(ex2);
}
conn->to << len;
conn->to(data, len);
};
};
/* Handle log messages / exceptions from the remote on a
separate thread. */
std::thread stderrThread([&]()
{
try {
conn.processStderr(nullptr, nullptr, false);
} catch (...) {
ex = std::current_exception();
}
});
Finally joinStderrThread([&]()
{
if (stderrThread.joinable()) {
stderrThread.join();
if (ex) {
try {
std::rethrow_exception(ex);
} catch (...) {
ignoreException();
}
}
}
});
{
FramedSink sink(conn, ex);
conn.withFramedSink([&](Sink & sink) {
copyNAR(source, sink);
sink.flush();
}
stderrThread.join();
if (ex)
std::rethrow_exception(ex);
});
} else if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 21) {
conn.processStderr(0, &source);
} else {
@ -647,57 +664,11 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
}
StorePath RemoteStore::addToStore(const string & name, const Path & _srcPath,
FileIngestionMethod method, HashType hashAlgo, PathFilter & filter, RepairFlag repair)
{
if (repair) throw Error("repairing is not supported when building through the Nix daemon");
auto conn(getConnection());
Path srcPath(absPath(_srcPath));
conn->to
<< wopAddToStore
<< name
<< ((hashAlgo == htSHA256 && method == FileIngestionMethod::Recursive) ? 0 : 1) /* backwards compatibility hack */
<< (method == FileIngestionMethod::Recursive ? 1 : 0)
<< printHashType(hashAlgo);
try {
conn->to.written = 0;
conn->to.warn = true;
connections->incCapacity();
{
Finally cleanup([&]() { connections->decCapacity(); });
dumpPath(srcPath, conn->to, filter);
}
conn->to.warn = false;
conn.processStderr();
} catch (SysError & e) {
/* Daemon closed while we were sending the path. Probably OOM
or I/O error. */
if (e.errNo == EPIPE)
try {
conn.processStderr();
} catch (EndOfFile & e) { }
throw;
}
return parseStorePath(readString(conn->from));
}
StorePath RemoteStore::addTextToStore(const string & name, const string & s,
const StorePathSet & references, RepairFlag repair)
{
if (repair) throw Error("repairing is not supported when building through the Nix daemon");
auto conn(getConnection());
conn->to << wopAddTextToStore << name << s;
writeStorePaths(*this, conn->to, references);
conn.processStderr();
return parseStorePath(readString(conn->from));
StringSource source(s);
return addCAToStore(source, name, TextHashMethod{}, references, repair)->path;
}
@ -993,6 +964,49 @@ std::exception_ptr RemoteStore::Connection::processStderr(Sink * sink, Source *
return nullptr;
}
void
ConnectionHandle::withFramedSink(std::function<void(Sink &sink)> fun) {
(*this)->to.flush();
std::exception_ptr ex;
/* Handle log messages / exceptions from the remote on a
separate thread. */
std::thread stderrThread([&]()
{
try {
processStderr(nullptr, nullptr, false);
} catch (...) {
ex = std::current_exception();
}
});
Finally joinStderrThread([&]()
{
if (stderrThread.joinable()) {
stderrThread.join();
if (ex) {
try {
std::rethrow_exception(ex);
} catch (...) {
ignoreException();
}
}
}
});
{
FramedSink sink((*this)->to, ex);
fun(sink);
sink.flush();
}
stderrThread.join();
if (ex)
std::rethrow_exception(ex);
}
static RegisterStoreImplementation<UDSRemoteStore, UDSRemoteStoreConfig> regStore;
}

View file

@ -63,13 +63,16 @@ public:
void querySubstitutablePathInfos(const StorePathCAMap & paths,
SubstitutablePathInfos & infos) override;
/* Add a content-addressable store path. `dump` will be drained. */
ref<const ValidPathInfo> addCAToStore(Source & dump, const string & name, ContentAddressMethod caMethod, StorePathSet references, RepairFlag repair);
/* Add a content-addressable store path. Does not support references. `dump` will be drained. */
StorePath addToStoreFromDump(Source & dump, const string & name,
FileIngestionMethod method = FileIngestionMethod::Recursive, HashType hashAlgo = htSHA256, RepairFlag repair = NoRepair) override;
void addToStore(const ValidPathInfo & info, Source & nar,
RepairFlag repair, CheckSigsFlag checkSigs) override;
StorePath addToStore(const string & name, const Path & srcPath,
FileIngestionMethod method = FileIngestionMethod::Recursive, HashType hashAlgo = htSHA256,
PathFilter & filter = defaultPathFilter, RepairFlag repair = NoRepair) override;
StorePath addTextToStore(const string & name, const string & s,
const StorePathSet & references, RepairFlag repair) override;
@ -139,6 +142,8 @@ protected:
virtual void narFromPath(const StorePath & path, Sink & sink) override;
ref<const ValidPathInfo> readValidPathInfo(ConnectionHandle & conn, const StorePath & path);
private:
std::atomic_bool failed{false};

View file

@ -449,7 +449,8 @@ public:
/* Like addToStore(), but the contents of the path are contained
in `dump', which is either a NAR serialisation (if recursive ==
true) or simply the contents of a regular file (if recursive ==
false). */
false).
`dump` may be drained */
// FIXME: remove?
virtual StorePath addToStoreFromDump(Source & dump, const string & name,
FileIngestionMethod method = FileIngestionMethod::Recursive, HashType hashAlgo = htSHA256, RepairFlag repair = NoRepair)

View file

@ -6,7 +6,7 @@ namespace nix {
#define WORKER_MAGIC_1 0x6e697863
#define WORKER_MAGIC_2 0x6478696f
#define PROTOCOL_VERSION 0x118
#define PROTOCOL_VERSION 0x119
#define GET_PROTOCOL_MAJOR(x) ((x) & 0xff00)
#define GET_PROTOCOL_MINOR(x) ((x) & 0x00ff)
@ -18,7 +18,7 @@ typedef enum {
wopQueryReferences = 5, // obsolete
wopQueryReferrers = 6,
wopAddToStore = 7,
wopAddTextToStore = 8,
wopAddTextToStore = 8, // obsolete since 1.25, Nix 3.0. Use wopAddToStore
wopBuildPaths = 9,
wopEnsurePath = 10,
wopAddTempRoot = 11,

View file

@ -93,7 +93,7 @@ void Source::operator () (unsigned char * data, size_t len)
}
std::string Source::drain()
void Source::drainInto(Sink & sink)
{
std::string s;
std::vector<unsigned char> buf(8192);
@ -101,12 +101,19 @@ std::string Source::drain()
size_t n;
try {
n = read(buf.data(), buf.size());
s.append((char *) buf.data(), n);
sink(buf.data(), n);
} catch (EndOfFile &) {
break;
}
}
return s;
}
std::string Source::drain()
{
StringSink s;
drainInto(s);
return *s.s;
}

View file

@ -69,6 +69,8 @@ struct Source
virtual bool good() { return true; }
void drainInto(Sink & sink);
std::string drain();
};
@ -404,4 +406,93 @@ struct StreamToSourceAdapter : Source
};
/* A source that reads a distinct format of concatenated chunks back into its
logical form, in order to guarantee a known state to the original stream,
even in the event of errors.
Use with FramedSink, which also allows the logical stream to be terminated
in the event of an exception.
*/
struct FramedSource : Source
{
Source & from;
bool eof = false;
std::vector<unsigned char> pending;
size_t pos = 0;
FramedSource(Source & from) : from(from)
{ }
~FramedSource()
{
if (!eof) {
while (true) {
auto n = readInt(from);
if (!n) break;
std::vector<unsigned char> data(n);
from(data.data(), n);
}
}
}
size_t read(unsigned char * data, size_t len) override
{
if (eof) throw EndOfFile("reached end of FramedSource");
if (pos >= pending.size()) {
size_t len = readInt(from);
if (!len) {
eof = true;
return 0;
}
pending = std::vector<unsigned char>(len);
pos = 0;
from(pending.data(), len);
}
auto n = std::min(len, pending.size() - pos);
memcpy(data, pending.data() + pos, n);
pos += n;
return n;
}
};
/* Write as chunks in the format expected by FramedSource.
The exception_ptr reference can be used to terminate the stream when you
detect that an error has occurred on the remote end.
*/
struct FramedSink : nix::BufferedSink
{
BufferedSink & to;
std::exception_ptr & ex;
FramedSink(BufferedSink & to, std::exception_ptr & ex) : to(to), ex(ex)
{ }
~FramedSink()
{
try {
to << 0;
to.flush();
} catch (...) {
ignoreException();
}
}
void write(const unsigned char * data, size_t len) override
{
/* Don't send more data if the remote has
encountered an error. */
if (ex) {
auto ex2 = ex;
ex = nullptr;
std::rethrow_exception(ex2);
}
to << len;
to(data, len);
};
};
}

View file

@ -9,7 +9,7 @@ namespace nix {
// If `separator` is found, we return the portion of the string before the
// separator, and modify the string argument to contain only the part after the
// separator. Otherwise, wer return `std::nullopt`, and we leave the argument
// separator. Otherwise, we return `std::nullopt`, and we leave the argument
// string alone.
static inline std::optional<std::string_view> splitPrefixTo(std::string_view & string, char separator) {
auto sepInstance = string.find(separator);