libstore: generatorize protocol serializers

this is cursed. deeply and profoundly cursed. under NO CIRCUMSTANCES
must protocol serializer helpers be applied to temporaries! doing so
will inevitably cause dangling references and cause the entire thing
to crash. we need to do this even so to get rid of boost coroutines,
and likewise to encapsulate the serializers we suffer today at least
a little bit to allow a gradual migration to an actual IPC protocol.

(this isn't a problem that's unique to generators. c++ coroutines in
general cannot safely take references to arbitrary temporaries since
c++ does not have a lifetime system that can make this safe. -sigh-)

Change-Id: I2921ba451e04d86798752d140885d3c5cc08e146
This commit is contained in:
eldritch horrors 2024-05-18 02:35:34 +02:00
parent 5271424d14
commit a5d1f69841
19 changed files with 170 additions and 154 deletions

View file

@ -1208,7 +1208,7 @@ HookReply DerivationGoal::tryBuildHook()
/* Tell the hook all the inputs that have to be copied to the /* Tell the hook all the inputs that have to be copied to the
remote system. */ remote system. */
CommonProto::write(worker.store, conn, inputPaths); conn.to << CommonProto::write(worker.store, conn, inputPaths);
/* Tell the hooks the missing outputs that have to be copied back /* Tell the hooks the missing outputs that have to be copied back
from the remote system. */ from the remote system. */
@ -1219,7 +1219,7 @@ HookReply DerivationGoal::tryBuildHook()
if (buildMode != bmCheck && status.known && status.known->isValid()) continue; if (buildMode != bmCheck && status.known && status.known->isValid()) continue;
missingOutputs.insert(outputName); missingOutputs.insert(outputName);
} }
CommonProto::write(worker.store, conn, missingOutputs); conn.to << CommonProto::write(worker.store, conn, missingOutputs);
} }
hook->sink = FdSink(); hook->sink = FdSink();

View file

@ -20,9 +20,9 @@ namespace nix {
{ \ { \
return LengthPrefixedProtoHelper<CommonProto, T >::read(store, conn); \ return LengthPrefixedProtoHelper<CommonProto, T >::read(store, conn); \
} \ } \
TEMPLATE void CommonProto::Serialise< T >::write(const Store & store, CommonProto::WriteConn conn, const T & t) \ TEMPLATE [[nodiscard]] WireFormatGenerator CommonProto::Serialise< T >::write(const Store & store, CommonProto::WriteConn conn, const T & t) \
{ \ { \
LengthPrefixedProtoHelper<CommonProto, T >::write(store, conn, t); \ return LengthPrefixedProtoHelper<CommonProto, T >::write(store, conn, t); \
} }
COMMON_USE_LENGTH_PREFIX_SERIALISER(template<typename T>, std::vector<T>) COMMON_USE_LENGTH_PREFIX_SERIALISER(template<typename T>, std::vector<T>)

View file

@ -16,9 +16,9 @@ std::string CommonProto::Serialise<std::string>::read(const Store & store, Commo
return readString(conn.from); return readString(conn.from);
} }
void CommonProto::Serialise<std::string>::write(const Store & store, CommonProto::WriteConn conn, const std::string & str) WireFormatGenerator CommonProto::Serialise<std::string>::write(const Store & store, CommonProto::WriteConn conn, const std::string & str)
{ {
conn.to << str; co_yield str;
} }
@ -27,9 +27,9 @@ StorePath CommonProto::Serialise<StorePath>::read(const Store & store, CommonPro
return store.parseStorePath(readString(conn.from)); return store.parseStorePath(readString(conn.from));
} }
void CommonProto::Serialise<StorePath>::write(const Store & store, CommonProto::WriteConn conn, const StorePath & storePath) WireFormatGenerator CommonProto::Serialise<StorePath>::write(const Store & store, CommonProto::WriteConn conn, const StorePath & storePath)
{ {
conn.to << store.printStorePath(storePath); co_yield store.printStorePath(storePath);
} }
@ -38,9 +38,9 @@ ContentAddress CommonProto::Serialise<ContentAddress>::read(const Store & store,
return ContentAddress::parse(readString(conn.from)); return ContentAddress::parse(readString(conn.from));
} }
void CommonProto::Serialise<ContentAddress>::write(const Store & store, CommonProto::WriteConn conn, const ContentAddress & ca) WireFormatGenerator CommonProto::Serialise<ContentAddress>::write(const Store & store, CommonProto::WriteConn conn, const ContentAddress & ca)
{ {
conn.to << renderContentAddress(ca); co_yield renderContentAddress(ca);
} }
@ -53,9 +53,9 @@ Realisation CommonProto::Serialise<Realisation>::read(const Store & store, Commo
); );
} }
void CommonProto::Serialise<Realisation>::write(const Store & store, CommonProto::WriteConn conn, const Realisation & realisation) WireFormatGenerator CommonProto::Serialise<Realisation>::write(const Store & store, CommonProto::WriteConn conn, const Realisation & realisation)
{ {
conn.to << realisation.toJSON().dump(); co_yield realisation.toJSON().dump();
} }
@ -64,9 +64,9 @@ DrvOutput CommonProto::Serialise<DrvOutput>::read(const Store & store, CommonPro
return DrvOutput::parse(readString(conn.from)); return DrvOutput::parse(readString(conn.from));
} }
void CommonProto::Serialise<DrvOutput>::write(const Store & store, CommonProto::WriteConn conn, const DrvOutput & drvOutput) WireFormatGenerator CommonProto::Serialise<DrvOutput>::write(const Store & store, CommonProto::WriteConn conn, const DrvOutput & drvOutput)
{ {
conn.to << drvOutput.to_string(); co_yield drvOutput.to_string();
} }
@ -76,9 +76,11 @@ std::optional<StorePath> CommonProto::Serialise<std::optional<StorePath>>::read(
return s == "" ? std::optional<StorePath> {} : store.parseStorePath(s); return s == "" ? std::optional<StorePath> {} : store.parseStorePath(s);
} }
void CommonProto::Serialise<std::optional<StorePath>>::write(const Store & store, CommonProto::WriteConn conn, const std::optional<StorePath> & storePathOpt) WireFormatGenerator CommonProto::Serialise<std::optional<StorePath>>::write(const Store & store, CommonProto::WriteConn conn, const std::optional<StorePath> & storePathOpt)
{ {
conn.to << (storePathOpt ? store.printStorePath(*storePathOpt) : ""); return [](std::string s) -> WireFormatGenerator {
co_yield s;
}(storePathOpt ? store.printStorePath(*storePathOpt) : "");
} }
@ -87,9 +89,11 @@ std::optional<ContentAddress> CommonProto::Serialise<std::optional<ContentAddres
return ContentAddress::parseOpt(readString(conn.from)); return ContentAddress::parseOpt(readString(conn.from));
} }
void CommonProto::Serialise<std::optional<ContentAddress>>::write(const Store & store, CommonProto::WriteConn conn, const std::optional<ContentAddress> & caOpt) WireFormatGenerator CommonProto::Serialise<std::optional<ContentAddress>>::write(const Store & store, CommonProto::WriteConn conn, const std::optional<ContentAddress> & caOpt)
{ {
conn.to << (caOpt ? renderContentAddress(*caOpt) : ""); return [](std::string s) -> WireFormatGenerator {
co_yield s;
}(caOpt ? renderContentAddress(*caOpt) : "");
} }
} }

View file

@ -48,9 +48,10 @@ struct CommonProto
* infer the type instead of having to write it down explicitly. * infer the type instead of having to write it down explicitly.
*/ */
template<typename T> template<typename T>
static void write(const Store & store, WriteConn conn, const T & t) [[nodiscard]]
static WireFormatGenerator write(const Store & store, WriteConn conn, const T & t)
{ {
CommonProto::Serialise<T>::write(store, conn, t); return CommonProto::Serialise<T>::write(store, conn, t);
} }
}; };
@ -58,7 +59,7 @@ struct CommonProto
struct CommonProto::Serialise< T > \ struct CommonProto::Serialise< T > \
{ \ { \
static T read(const Store & store, CommonProto::ReadConn conn); \ static T read(const Store & store, CommonProto::ReadConn conn); \
static void write(const Store & store, CommonProto::WriteConn conn, const T & str); \ [[nodiscard]] static WireFormatGenerator write(const Store & store, CommonProto::WriteConn conn, const T & str); \
} }
template<> template<>

View file

@ -291,7 +291,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
} }
auto res = store->queryValidPaths(paths, substitute); auto res = store->queryValidPaths(paths, substitute);
logger->stopWork(); logger->stopWork();
WorkerProto::write(*store, wconn, res); wconn.to << WorkerProto::write(*store, wconn, res);
break; break;
} }
@ -300,7 +300,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
logger->startWork(); logger->startWork();
auto res = store->querySubstitutablePaths(paths); auto res = store->querySubstitutablePaths(paths);
logger->stopWork(); logger->stopWork();
WorkerProto::write(*store, wconn, res); wconn.to << WorkerProto::write(*store, wconn, res);
break; break;
} }
@ -365,7 +365,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
#pragma GCC diagnostic pop #pragma GCC diagnostic pop
logger->stopWork(); logger->stopWork();
WorkerProto::write(*store, wconn, paths); wconn.to << WorkerProto::write(*store, wconn, paths);
break; break;
} }
@ -385,7 +385,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
logger->startWork(); logger->startWork();
auto outputs = store->queryPartialDerivationOutputMap(path); auto outputs = store->queryPartialDerivationOutputMap(path);
logger->stopWork(); logger->stopWork();
WorkerProto::write(*store, wconn, outputs); wconn.to << WorkerProto::write(*store, wconn, outputs);
break; break;
} }
@ -432,7 +432,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
}(); }();
logger->stopWork(); logger->stopWork();
WorkerProto::Serialise<ValidPathInfo>::write(*store, wconn, *pathInfo); wconn.to << WorkerProto::Serialise<ValidPathInfo>::write(*store, wconn, *pathInfo);
} else { } else {
HashType hashAlgo; HashType hashAlgo;
std::string baseName; std::string baseName;
@ -565,7 +565,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
auto results = store->buildPathsWithResults(drvs, mode); auto results = store->buildPathsWithResults(drvs, mode);
logger->stopWork(); logger->stopWork();
WorkerProto::write(*store, wconn, results); wconn.to << WorkerProto::write(*store, wconn, results);
break; break;
} }
@ -643,7 +643,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
auto res = store->buildDerivation(drvPath, drv, buildMode); auto res = store->buildDerivation(drvPath, drv, buildMode);
logger->stopWork(); logger->stopWork();
WorkerProto::write(*store, wconn, res); wconn.to << WorkerProto::write(*store, wconn, res);
break; break;
} }
@ -777,7 +777,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
else { else {
to << 1 to << 1
<< (i->second.deriver ? store->printStorePath(*i->second.deriver) : ""); << (i->second.deriver ? store->printStorePath(*i->second.deriver) : "");
WorkerProto::write(*store, wconn, i->second.references); wconn.to << WorkerProto::write(*store, wconn, i->second.references);
to << i->second.downloadSize to << i->second.downloadSize
<< i->second.narSize; << i->second.narSize;
} }
@ -800,7 +800,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
for (auto & i : infos) { for (auto & i : infos) {
to << store->printStorePath(i.first) to << store->printStorePath(i.first)
<< (i.second.deriver ? store->printStorePath(*i.second.deriver) : ""); << (i.second.deriver ? store->printStorePath(*i.second.deriver) : "");
WorkerProto::write(*store, wconn, i.second.references); wconn.to << WorkerProto::write(*store, wconn, i.second.references);
to << i.second.downloadSize << i.second.narSize; to << i.second.downloadSize << i.second.narSize;
} }
break; break;
@ -810,7 +810,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
logger->startWork(); logger->startWork();
auto paths = store->queryAllValidPaths(); auto paths = store->queryAllValidPaths();
logger->stopWork(); logger->stopWork();
WorkerProto::write(*store, wconn, paths); wconn.to << WorkerProto::write(*store, wconn, paths);
break; break;
} }
@ -827,7 +827,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
logger->stopWork(); logger->stopWork();
if (info) { if (info) {
to << 1; to << 1;
WorkerProto::write(*store, wconn, static_cast<const UnkeyedValidPathInfo &>(*info)); wconn.to << WorkerProto::write(*store, wconn, static_cast<const UnkeyedValidPathInfo &>(*info));
} else { } else {
to << 0; to << 0;
} }
@ -922,9 +922,9 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
uint64_t downloadSize, narSize; uint64_t downloadSize, narSize;
store->queryMissing(targets, willBuild, willSubstitute, unknown, downloadSize, narSize); store->queryMissing(targets, willBuild, willSubstitute, unknown, downloadSize, narSize);
logger->stopWork(); logger->stopWork();
WorkerProto::write(*store, wconn, willBuild); wconn.to << WorkerProto::write(*store, wconn, willBuild);
WorkerProto::write(*store, wconn, willSubstitute); wconn.to << WorkerProto::write(*store, wconn, willSubstitute);
WorkerProto::write(*store, wconn, unknown); wconn.to << WorkerProto::write(*store, wconn, unknown);
to << downloadSize << narSize; to << downloadSize << narSize;
break; break;
} }
@ -952,11 +952,11 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
if (GET_PROTOCOL_MINOR(clientVersion) < 31) { if (GET_PROTOCOL_MINOR(clientVersion) < 31) {
std::set<StorePath> outPaths; std::set<StorePath> outPaths;
if (info) outPaths.insert(info->outPath); if (info) outPaths.insert(info->outPath);
WorkerProto::write(*store, wconn, outPaths); wconn.to << WorkerProto::write(*store, wconn, outPaths);
} else { } else {
std::set<Realisation> realisations; std::set<Realisation> realisations;
if (info) realisations.insert(*info); if (info) realisations.insert(*info);
WorkerProto::write(*store, wconn, realisations); wconn.to << WorkerProto::write(*store, wconn, realisations);
} }
break; break;
} }
@ -1037,7 +1037,7 @@ void processConnection(
? store->isTrustedClient() ? store->isTrustedClient()
: std::optional { NotTrusted }; : std::optional { NotTrusted };
WorkerProto::WriteConn wconn {to, clientVersion}; WorkerProto::WriteConn wconn {to, clientVersion};
WorkerProto::write(*store, wconn, temp); wconn.to << WorkerProto::write(*store, wconn, temp);
} }
/* Send startup error messages to the client. */ /* Send startup error messages to the client. */

View file

@ -994,7 +994,7 @@ void writeDerivation(Sink & out, const Store & store, const BasicDerivation & dr
}, },
}, i.second.raw); }, i.second.raw);
} }
CommonProto::write(store, out << CommonProto::write(store,
CommonProto::WriteConn { .to = out }, CommonProto::WriteConn { .to = out },
drv.inputSrcs); drv.inputSrcs);
out << drv.platform << drv.builder << drv.args; out << drv.platform << drv.builder << drv.args;

View file

@ -46,7 +46,7 @@ void Store::exportPath(const StorePath & path, Sink & sink)
teeSink teeSink
<< exportMagic << exportMagic
<< printStorePath(path); << printStorePath(path);
CommonProto::write(*this, teeSink << CommonProto::write(*this,
CommonProto::WriteConn { .to = teeSink }, CommonProto::WriteConn { .to = teeSink },
info->references); info->references);
teeSink teeSink

View file

@ -185,7 +185,7 @@ struct LegacySSHStore : public virtual LegacySSHStoreConfig, public virtual Stor
<< printStorePath(info.path) << printStorePath(info.path)
<< (info.deriver ? printStorePath(*info.deriver) : "") << (info.deriver ? printStorePath(*info.deriver) : "")
<< info.narHash.to_string(Base16, false); << info.narHash.to_string(Base16, false);
ServeProto::write(*this, *conn, info.references); conn->to << ServeProto::write(*this, *conn, info.references);
conn->to conn->to
<< info.registrationTime << info.registrationTime
<< info.narSize << info.narSize
@ -214,7 +214,7 @@ struct LegacySSHStore : public virtual LegacySSHStoreConfig, public virtual Stor
conn->to conn->to
<< exportMagic << exportMagic
<< printStorePath(info.path); << printStorePath(info.path);
ServeProto::write(*this, *conn, info.references); conn->to << ServeProto::write(*this, *conn, info.references);
conn->to conn->to
<< (info.deriver ? printStorePath(*info.deriver) : "") << (info.deriver ? printStorePath(*info.deriver) : "")
<< 0 << 0
@ -366,7 +366,7 @@ public:
conn->to conn->to
<< ServeProto::Command::QueryClosure << ServeProto::Command::QueryClosure
<< includeOutputs; << includeOutputs;
ServeProto::write(*this, *conn, paths); conn->to << ServeProto::write(*this, *conn, paths);
conn->to.flush(); conn->to.flush();
for (auto & i : ServeProto::Serialise<StorePathSet>::read(*this, *conn)) for (auto & i : ServeProto::Serialise<StorePathSet>::read(*this, *conn))
@ -382,7 +382,7 @@ public:
<< ServeProto::Command::QueryValidPaths << ServeProto::Command::QueryValidPaths
<< false // lock << false // lock
<< maybeSubstitute; << maybeSubstitute;
ServeProto::write(*this, *conn, paths); conn->to << ServeProto::write(*this, *conn, paths);
conn->to.flush(); conn->to.flush();
return ServeProto::Serialise<StorePathSet>::read(*this, *conn); return ServeProto::Serialise<StorePathSet>::read(*this, *conn);

View file

@ -7,6 +7,7 @@
*/ */
#include "types.hh" #include "types.hh"
#include "serialise.hh"
namespace nix { namespace nix {
@ -45,7 +46,7 @@ struct LengthPrefixedProtoHelper;
struct LengthPrefixedProtoHelper< Inner, T > \ struct LengthPrefixedProtoHelper< Inner, T > \
{ \ { \
static T read(const Store & store, typename Inner::ReadConn conn); \ static T read(const Store & store, typename Inner::ReadConn conn); \
static void write(const Store & store, typename Inner::WriteConn conn, const T & str); \ [[nodiscard]] static WireFormatGenerator write(const Store & store, typename Inner::WriteConn conn, const T & str); \
private: \ private: \
template<typename U> using S = typename Inner::template Serialise<U>; \ template<typename U> using S = typename Inner::template Serialise<U>; \
} }
@ -78,13 +79,13 @@ LengthPrefixedProtoHelper<Inner, std::vector<T>>::read(
} }
template<class Inner, typename T> template<class Inner, typename T>
void WireFormatGenerator
LengthPrefixedProtoHelper<Inner, std::vector<T>>::write( LengthPrefixedProtoHelper<Inner, std::vector<T>>::write(
const Store & store, typename Inner::WriteConn conn, const std::vector<T> & resSet) const Store & store, typename Inner::WriteConn conn, const std::vector<T> & resSet)
{ {
conn.to << resSet.size(); co_yield resSet.size();
for (auto & key : resSet) { for (auto & key : resSet) {
S<T>::write(store, conn, key); co_yield S<T>::write(store, conn, key);
} }
} }
@ -102,13 +103,13 @@ LengthPrefixedProtoHelper<Inner, std::set<T>>::read(
} }
template<class Inner, typename T> template<class Inner, typename T>
void WireFormatGenerator
LengthPrefixedProtoHelper<Inner, std::set<T>>::write( LengthPrefixedProtoHelper<Inner, std::set<T>>::write(
const Store & store, typename Inner::WriteConn conn, const std::set<T> & resSet) const Store & store, typename Inner::WriteConn conn, const std::set<T> & resSet)
{ {
conn.to << resSet.size(); co_yield resSet.size();
for (auto & key : resSet) { for (auto & key : resSet) {
S<T>::write(store, conn, key); co_yield S<T>::write(store, conn, key);
} }
} }
@ -128,14 +129,14 @@ LengthPrefixedProtoHelper<Inner, std::map<K, V>>::read(
} }
template<class Inner, typename K, typename V> template<class Inner, typename K, typename V>
void WireFormatGenerator
LengthPrefixedProtoHelper<Inner, std::map<K, V>>::write( LengthPrefixedProtoHelper<Inner, std::map<K, V>>::write(
const Store & store, typename Inner::WriteConn conn, const std::map<K, V> & resMap) const Store & store, typename Inner::WriteConn conn, const std::map<K, V> & resMap)
{ {
conn.to << resMap.size(); co_yield resMap.size();
for (auto & i : resMap) { for (auto & i : resMap) {
S<K>::write(store, conn, i.first); co_yield S<K>::write(store, conn, i.first);
S<V>::write(store, conn, i.second); co_yield S<V>::write(store, conn, i.second);
} }
} }
@ -150,13 +151,24 @@ LengthPrefixedProtoHelper<Inner, std::tuple<Ts...>>::read(
} }
template<class Inner, typename... Ts> template<class Inner, typename... Ts>
void WireFormatGenerator
LengthPrefixedProtoHelper<Inner, std::tuple<Ts...>>::write( LengthPrefixedProtoHelper<Inner, std::tuple<Ts...>>::write(
const Store & store, typename Inner::WriteConn conn, const std::tuple<Ts...> & res) const Store & store, typename Inner::WriteConn conn, const std::tuple<Ts...> & res)
{ {
std::apply([&]<typename... Us>(const Us &... args) { auto fullArgs = std::apply(
(S<Us>::write(store, conn, args), ...); [&](auto &... rest) {
}, res); return std::tuple<const Store &, typename Inner::WriteConn &, const Ts &...>(
std::cref(store), conn, rest...
);
},
res
);
return std::apply(
[]<typename... Us>(auto & store, auto conn, const Us &... args) -> WireFormatGenerator {
(co_yield S<Us>::write(store, conn, args), ...);
},
fullArgs
);
} }
} }

View file

@ -203,7 +203,7 @@ StorePathSet RemoteStore::queryValidPaths(const StorePathSet & paths, Substitute
{ {
auto conn(getConnection()); auto conn(getConnection());
conn->to << WorkerProto::Op::QueryValidPaths; conn->to << WorkerProto::Op::QueryValidPaths;
WorkerProto::write(*this, *conn, paths); conn->to << WorkerProto::write(*this, *conn, paths);
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 27) { if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 27) {
conn->to << maybeSubstitute; conn->to << maybeSubstitute;
} }
@ -225,7 +225,7 @@ StorePathSet RemoteStore::querySubstitutablePaths(const StorePathSet & paths)
{ {
auto conn(getConnection()); auto conn(getConnection());
conn->to << WorkerProto::Op::QuerySubstitutablePaths; conn->to << WorkerProto::Op::QuerySubstitutablePaths;
WorkerProto::write(*this, *conn, paths); conn->to << WorkerProto::write(*this, *conn, paths);
conn.processStderr(); conn.processStderr();
return WorkerProto::Serialise<StorePathSet>::read(*this, *conn); return WorkerProto::Serialise<StorePathSet>::read(*this, *conn);
} }
@ -243,9 +243,9 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathCAMap & pathsMap, S
StorePathSet paths; StorePathSet paths;
for (auto & path : pathsMap) for (auto & path : pathsMap)
paths.insert(path.first); paths.insert(path.first);
WorkerProto::write(*this, *conn, paths); conn->to << WorkerProto::write(*this, *conn, paths);
} else } else
WorkerProto::write(*this, *conn, pathsMap); conn->to << WorkerProto::write(*this, *conn, pathsMap);
conn.processStderr(); conn.processStderr();
size_t count = readNum<size_t>(conn->from); size_t count = readNum<size_t>(conn->from);
for (size_t n = 0; n < count; n++) { for (size_t n = 0; n < count; n++) {
@ -377,7 +377,7 @@ ref<const ValidPathInfo> RemoteStore::addCAToStore(
<< WorkerProto::Op::AddToStore << WorkerProto::Op::AddToStore
<< name << name
<< caMethod.render(hashType); << caMethod.render(hashType);
WorkerProto::write(*this, *conn, references); conn->to << WorkerProto::write(*this, *conn, references);
conn->to << repair; conn->to << repair;
// The dump source may invoke the store, so we need to make some room. // The dump source may invoke the store, so we need to make some room.
@ -402,7 +402,7 @@ ref<const ValidPathInfo> RemoteStore::addCAToStore(
name, printHashType(hashType)); name, printHashType(hashType));
std::string s = dump.drain(); std::string s = dump.drain();
conn->to << WorkerProto::Op::AddTextToStore << name << s; conn->to << WorkerProto::Op::AddTextToStore << name << s;
WorkerProto::write(*this, *conn, references); conn->to << WorkerProto::write(*this, *conn, references);
conn.processStderr(); conn.processStderr();
}, },
[&](const FileIngestionMethod & fim) -> void { [&](const FileIngestionMethod & fim) -> void {
@ -462,7 +462,7 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
<< printStorePath(info.path) << printStorePath(info.path)
<< (info.deriver ? printStorePath(*info.deriver) : "") << (info.deriver ? printStorePath(*info.deriver) : "")
<< info.narHash.to_string(Base16, false); << info.narHash.to_string(Base16, false);
WorkerProto::write(*this, *conn, info.references); conn->to << WorkerProto::write(*this, *conn, info.references);
conn->to << info.registrationTime << info.narSize conn->to << info.registrationTime << info.narSize
<< info.ultimate << info.sigs << renderContentAddress(info.ca) << info.ultimate << info.sigs << renderContentAddress(info.ca)
<< repair << !checkSigs; << repair << !checkSigs;
@ -488,7 +488,7 @@ void RemoteStore::addMultipleToStore(
auto source = sinkToSource([&](Sink & sink) { auto source = sinkToSource([&](Sink & sink) {
sink << pathsToCopy.size(); sink << pathsToCopy.size();
for (auto & [pathInfo, pathSource] : pathsToCopy) { for (auto & [pathInfo, pathSource] : pathsToCopy) {
WorkerProto::Serialise<ValidPathInfo>::write(*this, sink << WorkerProto::Serialise<ValidPathInfo>::write(*this,
WorkerProto::WriteConn {sink, remoteVersion}, WorkerProto::WriteConn {sink, remoteVersion},
pathInfo); pathInfo);
pathSource->drainInto(sink); pathSource->drainInto(sink);
@ -536,7 +536,7 @@ void RemoteStore::registerDrvOutput(const Realisation & info)
conn->to << info.id.to_string(); conn->to << info.id.to_string();
conn->to << std::string(info.outPath.to_string()); conn->to << std::string(info.outPath.to_string());
} else { } else {
WorkerProto::write(*this, *conn, info); conn->to << WorkerProto::write(*this, *conn, info);
} }
conn.processStderr(); conn.processStderr();
} }
@ -597,7 +597,7 @@ void RemoteStore::buildPaths(const std::vector<DerivedPath> & drvPaths, BuildMod
auto conn(getConnection()); auto conn(getConnection());
conn->to << WorkerProto::Op::BuildPaths; conn->to << WorkerProto::Op::BuildPaths;
WorkerProto::write(*this, *conn, drvPaths); conn->to << WorkerProto::write(*this, *conn, drvPaths);
conn->to << buildMode; conn->to << buildMode;
conn.processStderr(); conn.processStderr();
readInt(conn->from); readInt(conn->from);
@ -615,7 +615,7 @@ std::vector<KeyedBuildResult> RemoteStore::buildPathsWithResults(
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 34) { if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 34) {
conn->to << WorkerProto::Op::BuildPathsWithResults; conn->to << WorkerProto::Op::BuildPathsWithResults;
WorkerProto::write(*this, *conn, paths); conn->to << WorkerProto::write(*this, *conn, paths);
conn->to << buildMode; conn->to << buildMode;
conn.processStderr(); conn.processStderr();
return WorkerProto::Serialise<std::vector<KeyedBuildResult>>::read(*this, *conn); return WorkerProto::Serialise<std::vector<KeyedBuildResult>>::read(*this, *conn);
@ -740,7 +740,7 @@ void RemoteStore::collectGarbage(const GCOptions & options, GCResults & results)
conn->to conn->to
<< WorkerProto::Op::CollectGarbage << options.action; << WorkerProto::Op::CollectGarbage << options.action;
WorkerProto::write(*this, *conn, options.pathsToDelete); conn->to << WorkerProto::write(*this, *conn, options.pathsToDelete);
conn->to << options.ignoreLiveness conn->to << options.ignoreLiveness
<< options.maxFreed << options.maxFreed
/* removed options */ /* removed options */
@ -792,7 +792,7 @@ void RemoteStore::queryMissing(const std::vector<DerivedPath> & targets,
{ {
auto conn(getConnection()); auto conn(getConnection());
conn->to << WorkerProto::Op::QueryMissing; conn->to << WorkerProto::Op::QueryMissing;
WorkerProto::write(*this, *conn, targets); conn->to << WorkerProto::write(*this, *conn, targets);
conn.processStderr(); conn.processStderr();
willBuild = WorkerProto::Serialise<StorePathSet>::read(*this, *conn); willBuild = WorkerProto::Serialise<StorePathSet>::read(*this, *conn);
willSubstitute = WorkerProto::Serialise<StorePathSet>::read(*this, *conn); willSubstitute = WorkerProto::Serialise<StorePathSet>::read(*this, *conn);

View file

@ -20,9 +20,9 @@ namespace nix {
{ \ { \
return LengthPrefixedProtoHelper<ServeProto, T >::read(store, conn); \ return LengthPrefixedProtoHelper<ServeProto, T >::read(store, conn); \
} \ } \
TEMPLATE void ServeProto::Serialise< T >::write(const Store & store, ServeProto::WriteConn conn, const T & t) \ TEMPLATE [[nodiscard]] WireFormatGenerator ServeProto::Serialise< T >::write(const Store & store, ServeProto::WriteConn conn, const T & t) \
{ \ { \
LengthPrefixedProtoHelper<ServeProto, T >::write(store, conn, t); \ return LengthPrefixedProtoHelper<ServeProto, T >::write(store, conn, t); \
} }
SERVE_USE_LENGTH_PREFIX_SERIALISER(template<typename T>, std::vector<T>) SERVE_USE_LENGTH_PREFIX_SERIALISER(template<typename T>, std::vector<T>)
@ -46,9 +46,10 @@ struct ServeProto::Serialise
return CommonProto::Serialise<T>::read(store, return CommonProto::Serialise<T>::read(store,
CommonProto::ReadConn { .from = conn.from }); CommonProto::ReadConn { .from = conn.from });
} }
static void write(const Store & store, ServeProto::WriteConn conn, const T & t) [[nodiscard]]
static WireFormatGenerator write(const Store & store, ServeProto::WriteConn conn, const T & t)
{ {
CommonProto::Serialise<T>::write(store, return CommonProto::Serialise<T>::write(store,
CommonProto::WriteConn { .to = conn.to }, CommonProto::WriteConn { .to = conn.to },
t); t);
} }

View file

@ -34,23 +34,22 @@ BuildResult ServeProto::Serialise<BuildResult>::read(const Store & store, ServeP
return status; return status;
} }
void ServeProto::Serialise<BuildResult>::write(const Store & store, ServeProto::WriteConn conn, const BuildResult & status) WireFormatGenerator ServeProto::Serialise<BuildResult>::write(const Store & store, ServeProto::WriteConn conn, const BuildResult & status)
{ {
conn.to co_yield status.status;
<< status.status co_yield status.errorMsg;
<< status.errorMsg;
if (GET_PROTOCOL_MINOR(conn.version) >= 3) if (GET_PROTOCOL_MINOR(conn.version) >= 3) {
conn.to co_yield status.timesBuilt;
<< status.timesBuilt co_yield status.isNonDeterministic;
<< status.isNonDeterministic co_yield status.startTime;
<< status.startTime co_yield status.stopTime;
<< status.stopTime; }
if (GET_PROTOCOL_MINOR(conn.version) >= 6) { if (GET_PROTOCOL_MINOR(conn.version) >= 6) {
DrvOutputs builtOutputs; DrvOutputs builtOutputs;
for (auto & [output, realisation] : status.builtOutputs) for (auto & [output, realisation] : status.builtOutputs)
builtOutputs.insert_or_assign(realisation.id, realisation); builtOutputs.insert_or_assign(realisation.id, realisation);
ServeProto::write(store, conn, builtOutputs); co_yield ServeProto::write(store, conn, builtOutputs);
} }
} }
@ -80,21 +79,19 @@ UnkeyedValidPathInfo ServeProto::Serialise<UnkeyedValidPathInfo>::read(const Sto
return info; return info;
} }
void ServeProto::Serialise<UnkeyedValidPathInfo>::write(const Store & store, WriteConn conn, const UnkeyedValidPathInfo & info) WireFormatGenerator ServeProto::Serialise<UnkeyedValidPathInfo>::write(const Store & store, WriteConn conn, const UnkeyedValidPathInfo & info)
{ {
conn.to co_yield (info.deriver ? store.printStorePath(*info.deriver) : "");
<< (info.deriver ? store.printStorePath(*info.deriver) : "");
ServeProto::write(store, conn, info.references); co_yield ServeProto::write(store, conn, info.references);
// !!! Maybe we want compression? // !!! Maybe we want compression?
conn.to co_yield info.narSize; // downloadSize, lie a little
<< info.narSize // downloadSize, lie a little co_yield info.narSize;
<< info.narSize; if (GET_PROTOCOL_MINOR(conn.version) >= 4) {
if (GET_PROTOCOL_MINOR(conn.version) >= 4) co_yield info.narHash.to_string(Base32, true);
conn.to co_yield renderContentAddress(info.ca);
<< info.narHash.to_string(Base32, true) co_yield info.sigs;
<< renderContentAddress(info.ca) }
<< info.sigs;
} }
} }

View file

@ -79,7 +79,7 @@ struct ServeProto
#if 0 #if 0
{ {
static T read(const Store & store, ReadConn conn); static T read(const Store & store, ReadConn conn);
static void write(const Store & store, WriteConn conn, const T & t); static WireFormatGenerator write(const Store & store, WriteConn conn, const T & t);
}; };
#endif #endif
@ -88,9 +88,10 @@ struct ServeProto
* infer the type instead of having to write it down explicitly. * infer the type instead of having to write it down explicitly.
*/ */
template<typename T> template<typename T>
static void write(const Store & store, WriteConn conn, const T & t) [[nodiscard]]
static WireFormatGenerator write(const Store & store, WriteConn conn, const T & t)
{ {
ServeProto::Serialise<T>::write(store, conn, t); return ServeProto::Serialise<T>::write(store, conn, t);
} }
}; };
@ -142,7 +143,7 @@ inline std::ostream & operator << (std::ostream & s, ServeProto::Command op)
struct ServeProto::Serialise< T > \ struct ServeProto::Serialise< T > \
{ \ { \
static T read(const Store & store, ServeProto::ReadConn conn); \ static T read(const Store & store, ServeProto::ReadConn conn); \
static void write(const Store & store, ServeProto::WriteConn conn, const T & t); \ [[nodiscard]] static WireFormatGenerator write(const Store & store, ServeProto::WriteConn conn, const T & t); \
}; };
template<> template<>

View file

@ -20,9 +20,9 @@ namespace nix {
{ \ { \
return LengthPrefixedProtoHelper<WorkerProto, T >::read(store, conn); \ return LengthPrefixedProtoHelper<WorkerProto, T >::read(store, conn); \
} \ } \
TEMPLATE void WorkerProto::Serialise< T >::write(const Store & store, WorkerProto::WriteConn conn, const T & t) \ TEMPLATE [[nodiscard]] WireFormatGenerator WorkerProto::Serialise< T >::write(const Store & store, WorkerProto::WriteConn conn, const T & t) \
{ \ { \
LengthPrefixedProtoHelper<WorkerProto, T >::write(store, conn, t); \ return LengthPrefixedProtoHelper<WorkerProto, T >::write(store, conn, t); \
} }
WORKER_USE_LENGTH_PREFIX_SERIALISER(template<typename T>, std::vector<T>) WORKER_USE_LENGTH_PREFIX_SERIALISER(template<typename T>, std::vector<T>)
@ -46,9 +46,10 @@ struct WorkerProto::Serialise
return CommonProto::Serialise<T>::read(store, return CommonProto::Serialise<T>::read(store,
CommonProto::ReadConn { .from = conn.from }); CommonProto::ReadConn { .from = conn.from });
} }
static void write(const Store & store, WorkerProto::WriteConn conn, const T & t) [[nodiscard]]
static WireFormatGenerator write(const Store & store, WorkerProto::WriteConn conn, const T & t)
{ {
CommonProto::Serialise<T>::write(store, return CommonProto::Serialise<T>::write(store,
CommonProto::WriteConn { .to = conn.to }, CommonProto::WriteConn { .to = conn.to },
t); t);
} }

View file

@ -28,17 +28,17 @@ std::optional<TrustedFlag> WorkerProto::Serialise<std::optional<TrustedFlag>>::r
} }
} }
void WorkerProto::Serialise<std::optional<TrustedFlag>>::write(const Store & store, WorkerProto::WriteConn conn, const std::optional<TrustedFlag> & optTrusted) WireFormatGenerator WorkerProto::Serialise<std::optional<TrustedFlag>>::write(const Store & store, WorkerProto::WriteConn conn, const std::optional<TrustedFlag> & optTrusted)
{ {
if (!optTrusted) if (!optTrusted)
conn.to << (uint8_t)0; co_yield (uint8_t)0;
else { else {
switch (*optTrusted) { switch (*optTrusted) {
case Trusted: case Trusted:
conn.to << (uint8_t)1; co_yield (uint8_t)1;
break; break;
case NotTrusted: case NotTrusted:
conn.to << (uint8_t)2; co_yield (uint8_t)2;
break; break;
default: default:
assert(false); assert(false);
@ -57,23 +57,23 @@ DerivedPath WorkerProto::Serialise<DerivedPath>::read(const Store & store, Worke
} }
} }
void WorkerProto::Serialise<DerivedPath>::write(const Store & store, WorkerProto::WriteConn conn, const DerivedPath & req) WireFormatGenerator WorkerProto::Serialise<DerivedPath>::write(const Store & store, WorkerProto::WriteConn conn, const DerivedPath & req)
{ {
if (GET_PROTOCOL_MINOR(conn.version) >= 30) { if (GET_PROTOCOL_MINOR(conn.version) >= 30) {
conn.to << req.to_string_legacy(store); co_yield req.to_string_legacy(store);
} else { } else {
auto sOrDrvPath = StorePathWithOutputs::tryFromDerivedPath(req); auto sOrDrvPath = StorePathWithOutputs::tryFromDerivedPath(req);
std::visit(overloaded { co_yield std::visit(overloaded {
[&](const StorePathWithOutputs & s) { [&](const StorePathWithOutputs & s) -> std::string {
conn.to << s.to_string(store); return s.to_string(store);
}, },
[&](const StorePath & drvPath) { [&](const StorePath & drvPath) -> std::string {
throw Error("trying to request '%s', but daemon protocol %d.%d is too old (< 1.29) to request a derivation file", throw Error("trying to request '%s', but daemon protocol %d.%d is too old (< 1.29) to request a derivation file",
store.printStorePath(drvPath), store.printStorePath(drvPath),
GET_PROTOCOL_MAJOR(conn.version), GET_PROTOCOL_MAJOR(conn.version),
GET_PROTOCOL_MINOR(conn.version)); GET_PROTOCOL_MINOR(conn.version));
}, },
[&](std::monostate) { [&](std::monostate) -> std::string {
throw Error("wanted to build a derivation that is itself a build product, but protocols do not support that. Try upgrading the Nix implementation on the other end of this connection"); throw Error("wanted to build a derivation that is itself a build product, but protocols do not support that. Try upgrading the Nix implementation on the other end of this connection");
}, },
}, sOrDrvPath); }, sOrDrvPath);
@ -91,10 +91,10 @@ KeyedBuildResult WorkerProto::Serialise<KeyedBuildResult>::read(const Store & st
}; };
} }
void WorkerProto::Serialise<KeyedBuildResult>::write(const Store & store, WorkerProto::WriteConn conn, const KeyedBuildResult & res) WireFormatGenerator WorkerProto::Serialise<KeyedBuildResult>::write(const Store & store, WorkerProto::WriteConn conn, const KeyedBuildResult & res)
{ {
WorkerProto::write(store, conn, res.path); co_yield WorkerProto::write(store, conn, res.path);
WorkerProto::write(store, conn, static_cast<const BuildResult &>(res)); co_yield WorkerProto::write(store, conn, static_cast<const BuildResult &>(res));
} }
@ -120,23 +120,21 @@ BuildResult WorkerProto::Serialise<BuildResult>::read(const Store & store, Worke
return res; return res;
} }
void WorkerProto::Serialise<BuildResult>::write(const Store & store, WorkerProto::WriteConn conn, const BuildResult & res) WireFormatGenerator WorkerProto::Serialise<BuildResult>::write(const Store & store, WorkerProto::WriteConn conn, const BuildResult & res)
{ {
conn.to co_yield res.status;
<< res.status co_yield res.errorMsg;
<< res.errorMsg;
if (GET_PROTOCOL_MINOR(conn.version) >= 29) { if (GET_PROTOCOL_MINOR(conn.version) >= 29) {
conn.to co_yield res.timesBuilt;
<< res.timesBuilt co_yield res.isNonDeterministic;
<< res.isNonDeterministic co_yield res.startTime;
<< res.startTime co_yield res.stopTime;
<< res.stopTime;
} }
if (GET_PROTOCOL_MINOR(conn.version) >= 28) { if (GET_PROTOCOL_MINOR(conn.version) >= 28) {
DrvOutputs builtOutputs; DrvOutputs builtOutputs;
for (auto & [output, realisation] : res.builtOutputs) for (auto & [output, realisation] : res.builtOutputs)
builtOutputs.insert_or_assign(realisation.id, realisation); builtOutputs.insert_or_assign(realisation.id, realisation);
WorkerProto::write(store, conn, builtOutputs); co_yield WorkerProto::write(store, conn, builtOutputs);
} }
} }
@ -150,10 +148,10 @@ ValidPathInfo WorkerProto::Serialise<ValidPathInfo>::read(const Store & store, R
}; };
} }
void WorkerProto::Serialise<ValidPathInfo>::write(const Store & store, WriteConn conn, const ValidPathInfo & pathInfo) WireFormatGenerator WorkerProto::Serialise<ValidPathInfo>::write(const Store & store, WriteConn conn, const ValidPathInfo & pathInfo)
{ {
WorkerProto::write(store, conn, pathInfo.path); co_yield WorkerProto::write(store, conn, pathInfo.path);
WorkerProto::write(store, conn, static_cast<const UnkeyedValidPathInfo &>(pathInfo)); co_yield WorkerProto::write(store, conn, static_cast<const UnkeyedValidPathInfo &>(pathInfo));
} }
@ -173,18 +171,17 @@ UnkeyedValidPathInfo WorkerProto::Serialise<UnkeyedValidPathInfo>::read(const St
return info; return info;
} }
void WorkerProto::Serialise<UnkeyedValidPathInfo>::write(const Store & store, WriteConn conn, const UnkeyedValidPathInfo & pathInfo) WireFormatGenerator WorkerProto::Serialise<UnkeyedValidPathInfo>::write(const Store & store, WriteConn conn, const UnkeyedValidPathInfo & pathInfo)
{ {
conn.to co_yield (pathInfo.deriver ? store.printStorePath(*pathInfo.deriver) : "");
<< (pathInfo.deriver ? store.printStorePath(*pathInfo.deriver) : "") co_yield pathInfo.narHash.to_string(Base16, false);
<< pathInfo.narHash.to_string(Base16, false); co_yield WorkerProto::write(store, conn, pathInfo.references);
WorkerProto::write(store, conn, pathInfo.references); co_yield pathInfo.registrationTime;
conn.to << pathInfo.registrationTime << pathInfo.narSize; co_yield pathInfo.narSize;
conn.to co_yield pathInfo.ultimate;
<< pathInfo.ultimate co_yield pathInfo.sigs;
<< pathInfo.sigs co_yield renderContentAddress(pathInfo.ca);
<< renderContentAddress(pathInfo.ca);
} }
} }

View file

@ -122,7 +122,7 @@ struct WorkerProto
#if 0 #if 0
{ {
static T read(const Store & store, ReadConn conn); static T read(const Store & store, ReadConn conn);
static void write(const Store & store, WriteConn conn, const T & t); static WireFormatGenerator write(const Store & store, WriteConn conn, const T & t);
}; };
#endif #endif
@ -131,9 +131,10 @@ struct WorkerProto
* infer the type instead of having to write it down explicitly. * infer the type instead of having to write it down explicitly.
*/ */
template<typename T> template<typename T>
static void write(const Store & store, WriteConn conn, const T & t) [[nodiscard]]
static WireFormatGenerator write(const Store & store, WriteConn conn, const T & t)
{ {
WorkerProto::Serialise<T>::write(store, conn, t); return WorkerProto::Serialise<T>::write(store, conn, t);
} }
}; };
@ -219,7 +220,7 @@ inline std::ostream & operator << (std::ostream & s, WorkerProto::Op op)
struct WorkerProto::Serialise< T > \ struct WorkerProto::Serialise< T > \
{ \ { \
static T read(const Store & store, WorkerProto::ReadConn conn); \ static T read(const Store & store, WorkerProto::ReadConn conn); \
static void write(const Store & store, WorkerProto::WriteConn conn, const T & t); \ [[nodiscard]] static WireFormatGenerator write(const Store & store, WorkerProto::WriteConn conn, const T & t); \
}; };
template<> template<>

View file

@ -880,7 +880,8 @@ static void opServe(Strings opFlags, Strings opArgs)
store->substitutePaths(paths); store->substitutePaths(paths);
} }
ServeProto::write(*store, wconn, store->queryValidPaths(paths)); auto valid = store->queryValidPaths(paths);
wconn.to << ServeProto::write(*store, wconn, valid);
break; break;
} }
@ -891,7 +892,7 @@ static void opServe(Strings opFlags, Strings opArgs)
try { try {
auto info = store->queryPathInfo(i); auto info = store->queryPathInfo(i);
out << store->printStorePath(info->path); out << store->printStorePath(info->path);
ServeProto::write(*store, wconn, static_cast<const UnkeyedValidPathInfo &>(*info)); wconn.to << ServeProto::write(*store, wconn, static_cast<const UnkeyedValidPathInfo &>(*info));
} catch (InvalidPath &) { } catch (InvalidPath &) {
} }
} }
@ -950,7 +951,7 @@ static void opServe(Strings opFlags, Strings opArgs)
MonitorFdHup monitor(in.fd); MonitorFdHup monitor(in.fd);
auto status = store->buildDerivation(drvPath, drv); auto status = store->buildDerivation(drvPath, drv);
ServeProto::write(*store, wconn, status); wconn.to << ServeProto::write(*store, wconn, status);
break; break;
} }
@ -959,7 +960,7 @@ static void opServe(Strings opFlags, Strings opArgs)
StorePathSet closure; StorePathSet closure;
store->computeFSClosure(ServeProto::Serialise<StorePathSet>::read(*store, rconn), store->computeFSClosure(ServeProto::Serialise<StorePathSet>::read(*store, rconn),
closure, false, includeOutputs); closure, false, includeOutputs);
ServeProto::write(*store, wconn, closure); wconn.to << ServeProto::write(*store, wconn, closure);
break; break;
} }

View file

@ -50,7 +50,7 @@ public:
auto file = goldenMaster(testStem); auto file = goldenMaster(testStem);
StringSink to; StringSink to;
CommonProto::write( to << CommonProto::write(
*store, *store,
CommonProto::WriteConn { .to = to }, CommonProto::WriteConn { .to = to },
value); value);

View file

@ -56,7 +56,7 @@ public:
auto file = ProtoTest<Proto, protocolDir>::goldenMaster(testStem); auto file = ProtoTest<Proto, protocolDir>::goldenMaster(testStem);
StringSink to; StringSink to;
Proto::write( to << Proto::write(
*LibStoreTest::store, *LibStoreTest::store,
typename Proto::WriteConn {to, version}, typename Proto::WriteConn {to, version},
value); value);