From a5d1f698417dff4e470eb94c38ad6c4e5ebf38ff Mon Sep 17 00:00:00 2001 From: eldritch horrors Date: Sat, 18 May 2024 02:35:34 +0200 Subject: [PATCH] libstore: generatorize protocol serializers this is cursed. deeply and profoundly cursed. under NO CIRCUMSTANCES must protocol serializer helpers be applied to temporaries! doing so will inevitably cause dangling references and cause the entire thing to crash. we need to do this even so to get rid of boost coroutines, and likewise to encapsulate the serializers we suffer today at least a little bit to allow a gradual migration to an actual IPC protocol. (this isn't a problem that's unique to generators. c++ coroutines in general cannot safely take references to arbitrary temporaries since c++ does not have a lifetime system that can make this safe. -sigh-) Change-Id: I2921ba451e04d86798752d140885d3c5cc08e146 --- src/libstore/build/derivation-goal.cc | 4 +- src/libstore/common-protocol-impl.hh | 4 +- src/libstore/common-protocol.cc | 32 +++++---- src/libstore/common-protocol.hh | 7 +- src/libstore/daemon.cc | 34 ++++----- src/libstore/derivations.cc | 2 +- src/libstore/export-import.cc | 2 +- src/libstore/legacy-ssh-store.cc | 8 +-- .../length-prefixed-protocol-helper.hh | 42 +++++++---- src/libstore/remote-store.cc | 26 +++---- src/libstore/serve-protocol-impl.hh | 9 +-- src/libstore/serve-protocol.cc | 43 ++++++----- src/libstore/serve-protocol.hh | 9 +-- src/libstore/worker-protocol-impl.hh | 9 +-- src/libstore/worker-protocol.cc | 71 +++++++++---------- src/libstore/worker-protocol.hh | 9 +-- src/nix-store/nix-store.cc | 9 +-- tests/unit/libstore/common-protocol.cc | 2 +- tests/unit/libstore/protocol.hh | 2 +- 19 files changed, 170 insertions(+), 154 deletions(-) diff --git a/src/libstore/build/derivation-goal.cc b/src/libstore/build/derivation-goal.cc index 51597428d..a97c32599 100644 --- a/src/libstore/build/derivation-goal.cc +++ b/src/libstore/build/derivation-goal.cc @@ -1208,7 +1208,7 @@ HookReply DerivationGoal::tryBuildHook() /* Tell the hook all the inputs that have to be copied to the remote system. */ - CommonProto::write(worker.store, conn, inputPaths); + conn.to << CommonProto::write(worker.store, conn, inputPaths); /* Tell the hooks the missing outputs that have to be copied back from the remote system. */ @@ -1219,7 +1219,7 @@ HookReply DerivationGoal::tryBuildHook() if (buildMode != bmCheck && status.known && status.known->isValid()) continue; missingOutputs.insert(outputName); } - CommonProto::write(worker.store, conn, missingOutputs); + conn.to << CommonProto::write(worker.store, conn, missingOutputs); } hook->sink = FdSink(); diff --git a/src/libstore/common-protocol-impl.hh b/src/libstore/common-protocol-impl.hh index 079c182b8..fd1387e95 100644 --- a/src/libstore/common-protocol-impl.hh +++ b/src/libstore/common-protocol-impl.hh @@ -20,9 +20,9 @@ namespace nix { { \ return LengthPrefixedProtoHelper::read(store, conn); \ } \ - TEMPLATE void CommonProto::Serialise< T >::write(const Store & store, CommonProto::WriteConn conn, const T & t) \ + TEMPLATE [[nodiscard]] WireFormatGenerator CommonProto::Serialise< T >::write(const Store & store, CommonProto::WriteConn conn, const T & t) \ { \ - LengthPrefixedProtoHelper::write(store, conn, t); \ + return LengthPrefixedProtoHelper::write(store, conn, t); \ } COMMON_USE_LENGTH_PREFIX_SERIALISER(template, std::vector) diff --git a/src/libstore/common-protocol.cc b/src/libstore/common-protocol.cc index 456ad2b1f..4e2b2df31 100644 --- a/src/libstore/common-protocol.cc +++ b/src/libstore/common-protocol.cc @@ -16,9 +16,9 @@ std::string CommonProto::Serialise::read(const Store & store, Commo return readString(conn.from); } -void CommonProto::Serialise::write(const Store & store, CommonProto::WriteConn conn, const std::string & str) +WireFormatGenerator CommonProto::Serialise::write(const Store & store, CommonProto::WriteConn conn, const std::string & str) { - conn.to << str; + co_yield str; } @@ -27,9 +27,9 @@ StorePath CommonProto::Serialise::read(const Store & store, CommonPro return store.parseStorePath(readString(conn.from)); } -void CommonProto::Serialise::write(const Store & store, CommonProto::WriteConn conn, const StorePath & storePath) +WireFormatGenerator CommonProto::Serialise::write(const Store & store, CommonProto::WriteConn conn, const StorePath & storePath) { - conn.to << store.printStorePath(storePath); + co_yield store.printStorePath(storePath); } @@ -38,9 +38,9 @@ ContentAddress CommonProto::Serialise::read(const Store & store, return ContentAddress::parse(readString(conn.from)); } -void CommonProto::Serialise::write(const Store & store, CommonProto::WriteConn conn, const ContentAddress & ca) +WireFormatGenerator CommonProto::Serialise::write(const Store & store, CommonProto::WriteConn conn, const ContentAddress & ca) { - conn.to << renderContentAddress(ca); + co_yield renderContentAddress(ca); } @@ -53,9 +53,9 @@ Realisation CommonProto::Serialise::read(const Store & store, Commo ); } -void CommonProto::Serialise::write(const Store & store, CommonProto::WriteConn conn, const Realisation & realisation) +WireFormatGenerator CommonProto::Serialise::write(const Store & store, CommonProto::WriteConn conn, const Realisation & realisation) { - conn.to << realisation.toJSON().dump(); + co_yield realisation.toJSON().dump(); } @@ -64,9 +64,9 @@ DrvOutput CommonProto::Serialise::read(const Store & store, CommonPro return DrvOutput::parse(readString(conn.from)); } -void CommonProto::Serialise::write(const Store & store, CommonProto::WriteConn conn, const DrvOutput & drvOutput) +WireFormatGenerator CommonProto::Serialise::write(const Store & store, CommonProto::WriteConn conn, const DrvOutput & drvOutput) { - conn.to << drvOutput.to_string(); + co_yield drvOutput.to_string(); } @@ -76,9 +76,11 @@ std::optional CommonProto::Serialise>::read( return s == "" ? std::optional {} : store.parseStorePath(s); } -void CommonProto::Serialise>::write(const Store & store, CommonProto::WriteConn conn, const std::optional & storePathOpt) +WireFormatGenerator CommonProto::Serialise>::write(const Store & store, CommonProto::WriteConn conn, const std::optional & storePathOpt) { - conn.to << (storePathOpt ? store.printStorePath(*storePathOpt) : ""); + return [](std::string s) -> WireFormatGenerator { + co_yield s; + }(storePathOpt ? store.printStorePath(*storePathOpt) : ""); } @@ -87,9 +89,11 @@ std::optional CommonProto::Serialise>::write(const Store & store, CommonProto::WriteConn conn, const std::optional & caOpt) +WireFormatGenerator CommonProto::Serialise>::write(const Store & store, CommonProto::WriteConn conn, const std::optional & caOpt) { - conn.to << (caOpt ? renderContentAddress(*caOpt) : ""); + return [](std::string s) -> WireFormatGenerator { + co_yield s; + }(caOpt ? renderContentAddress(*caOpt) : ""); } } diff --git a/src/libstore/common-protocol.hh b/src/libstore/common-protocol.hh index f3f28972a..7f9f2808a 100644 --- a/src/libstore/common-protocol.hh +++ b/src/libstore/common-protocol.hh @@ -48,9 +48,10 @@ struct CommonProto * infer the type instead of having to write it down explicitly. */ template - static void write(const Store & store, WriteConn conn, const T & t) + [[nodiscard]] + static WireFormatGenerator write(const Store & store, WriteConn conn, const T & t) { - CommonProto::Serialise::write(store, conn, t); + return CommonProto::Serialise::write(store, conn, t); } }; @@ -58,7 +59,7 @@ struct CommonProto struct CommonProto::Serialise< T > \ { \ static T read(const Store & store, CommonProto::ReadConn conn); \ - static void write(const Store & store, CommonProto::WriteConn conn, const T & str); \ + [[nodiscard]] static WireFormatGenerator write(const Store & store, CommonProto::WriteConn conn, const T & str); \ } template<> diff --git a/src/libstore/daemon.cc b/src/libstore/daemon.cc index cae5b54fd..46500e7d3 100644 --- a/src/libstore/daemon.cc +++ b/src/libstore/daemon.cc @@ -291,7 +291,7 @@ static void performOp(TunnelLogger * logger, ref store, } auto res = store->queryValidPaths(paths, substitute); logger->stopWork(); - WorkerProto::write(*store, wconn, res); + wconn.to << WorkerProto::write(*store, wconn, res); break; } @@ -300,7 +300,7 @@ static void performOp(TunnelLogger * logger, ref store, logger->startWork(); auto res = store->querySubstitutablePaths(paths); logger->stopWork(); - WorkerProto::write(*store, wconn, res); + wconn.to << WorkerProto::write(*store, wconn, res); break; } @@ -365,7 +365,7 @@ static void performOp(TunnelLogger * logger, ref store, #pragma GCC diagnostic pop logger->stopWork(); - WorkerProto::write(*store, wconn, paths); + wconn.to << WorkerProto::write(*store, wconn, paths); break; } @@ -385,7 +385,7 @@ static void performOp(TunnelLogger * logger, ref store, logger->startWork(); auto outputs = store->queryPartialDerivationOutputMap(path); logger->stopWork(); - WorkerProto::write(*store, wconn, outputs); + wconn.to << WorkerProto::write(*store, wconn, outputs); break; } @@ -432,7 +432,7 @@ static void performOp(TunnelLogger * logger, ref store, }(); logger->stopWork(); - WorkerProto::Serialise::write(*store, wconn, *pathInfo); + wconn.to << WorkerProto::Serialise::write(*store, wconn, *pathInfo); } else { HashType hashAlgo; std::string baseName; @@ -565,7 +565,7 @@ static void performOp(TunnelLogger * logger, ref store, auto results = store->buildPathsWithResults(drvs, mode); logger->stopWork(); - WorkerProto::write(*store, wconn, results); + wconn.to << WorkerProto::write(*store, wconn, results); break; } @@ -643,7 +643,7 @@ static void performOp(TunnelLogger * logger, ref store, auto res = store->buildDerivation(drvPath, drv, buildMode); logger->stopWork(); - WorkerProto::write(*store, wconn, res); + wconn.to << WorkerProto::write(*store, wconn, res); break; } @@ -777,7 +777,7 @@ static void performOp(TunnelLogger * logger, ref store, else { to << 1 << (i->second.deriver ? store->printStorePath(*i->second.deriver) : ""); - WorkerProto::write(*store, wconn, i->second.references); + wconn.to << WorkerProto::write(*store, wconn, i->second.references); to << i->second.downloadSize << i->second.narSize; } @@ -800,7 +800,7 @@ static void performOp(TunnelLogger * logger, ref store, for (auto & i : infos) { to << store->printStorePath(i.first) << (i.second.deriver ? store->printStorePath(*i.second.deriver) : ""); - WorkerProto::write(*store, wconn, i.second.references); + wconn.to << WorkerProto::write(*store, wconn, i.second.references); to << i.second.downloadSize << i.second.narSize; } break; @@ -810,7 +810,7 @@ static void performOp(TunnelLogger * logger, ref store, logger->startWork(); auto paths = store->queryAllValidPaths(); logger->stopWork(); - WorkerProto::write(*store, wconn, paths); + wconn.to << WorkerProto::write(*store, wconn, paths); break; } @@ -827,7 +827,7 @@ static void performOp(TunnelLogger * logger, ref store, logger->stopWork(); if (info) { to << 1; - WorkerProto::write(*store, wconn, static_cast(*info)); + wconn.to << WorkerProto::write(*store, wconn, static_cast(*info)); } else { to << 0; } @@ -922,9 +922,9 @@ static void performOp(TunnelLogger * logger, ref store, uint64_t downloadSize, narSize; store->queryMissing(targets, willBuild, willSubstitute, unknown, downloadSize, narSize); logger->stopWork(); - WorkerProto::write(*store, wconn, willBuild); - WorkerProto::write(*store, wconn, willSubstitute); - WorkerProto::write(*store, wconn, unknown); + wconn.to << WorkerProto::write(*store, wconn, willBuild); + wconn.to << WorkerProto::write(*store, wconn, willSubstitute); + wconn.to << WorkerProto::write(*store, wconn, unknown); to << downloadSize << narSize; break; } @@ -952,11 +952,11 @@ static void performOp(TunnelLogger * logger, ref store, if (GET_PROTOCOL_MINOR(clientVersion) < 31) { std::set outPaths; if (info) outPaths.insert(info->outPath); - WorkerProto::write(*store, wconn, outPaths); + wconn.to << WorkerProto::write(*store, wconn, outPaths); } else { std::set realisations; if (info) realisations.insert(*info); - WorkerProto::write(*store, wconn, realisations); + wconn.to << WorkerProto::write(*store, wconn, realisations); } break; } @@ -1037,7 +1037,7 @@ void processConnection( ? store->isTrustedClient() : std::optional { NotTrusted }; WorkerProto::WriteConn wconn {to, clientVersion}; - WorkerProto::write(*store, wconn, temp); + wconn.to << WorkerProto::write(*store, wconn, temp); } /* Send startup error messages to the client. */ diff --git a/src/libstore/derivations.cc b/src/libstore/derivations.cc index 4e70804e5..cbb22e010 100644 --- a/src/libstore/derivations.cc +++ b/src/libstore/derivations.cc @@ -994,7 +994,7 @@ void writeDerivation(Sink & out, const Store & store, const BasicDerivation & dr }, }, i.second.raw); } - CommonProto::write(store, + out << CommonProto::write(store, CommonProto::WriteConn { .to = out }, drv.inputSrcs); out << drv.platform << drv.builder << drv.args; diff --git a/src/libstore/export-import.cc b/src/libstore/export-import.cc index fa1020537..0355e5c87 100644 --- a/src/libstore/export-import.cc +++ b/src/libstore/export-import.cc @@ -46,7 +46,7 @@ void Store::exportPath(const StorePath & path, Sink & sink) teeSink << exportMagic << printStorePath(path); - CommonProto::write(*this, + teeSink << CommonProto::write(*this, CommonProto::WriteConn { .to = teeSink }, info->references); teeSink diff --git a/src/libstore/legacy-ssh-store.cc b/src/libstore/legacy-ssh-store.cc index 57d333f12..e8394d805 100644 --- a/src/libstore/legacy-ssh-store.cc +++ b/src/libstore/legacy-ssh-store.cc @@ -185,7 +185,7 @@ struct LegacySSHStore : public virtual LegacySSHStoreConfig, public virtual Stor << printStorePath(info.path) << (info.deriver ? printStorePath(*info.deriver) : "") << info.narHash.to_string(Base16, false); - ServeProto::write(*this, *conn, info.references); + conn->to << ServeProto::write(*this, *conn, info.references); conn->to << info.registrationTime << info.narSize @@ -214,7 +214,7 @@ struct LegacySSHStore : public virtual LegacySSHStoreConfig, public virtual Stor conn->to << exportMagic << printStorePath(info.path); - ServeProto::write(*this, *conn, info.references); + conn->to << ServeProto::write(*this, *conn, info.references); conn->to << (info.deriver ? printStorePath(*info.deriver) : "") << 0 @@ -366,7 +366,7 @@ public: conn->to << ServeProto::Command::QueryClosure << includeOutputs; - ServeProto::write(*this, *conn, paths); + conn->to << ServeProto::write(*this, *conn, paths); conn->to.flush(); for (auto & i : ServeProto::Serialise::read(*this, *conn)) @@ -382,7 +382,7 @@ public: << ServeProto::Command::QueryValidPaths << false // lock << maybeSubstitute; - ServeProto::write(*this, *conn, paths); + conn->to << ServeProto::write(*this, *conn, paths); conn->to.flush(); return ServeProto::Serialise::read(*this, *conn); diff --git a/src/libstore/length-prefixed-protocol-helper.hh b/src/libstore/length-prefixed-protocol-helper.hh index 4061b0cd6..1475d2690 100644 --- a/src/libstore/length-prefixed-protocol-helper.hh +++ b/src/libstore/length-prefixed-protocol-helper.hh @@ -7,6 +7,7 @@ */ #include "types.hh" +#include "serialise.hh" namespace nix { @@ -45,7 +46,7 @@ struct LengthPrefixedProtoHelper; struct LengthPrefixedProtoHelper< Inner, T > \ { \ static T read(const Store & store, typename Inner::ReadConn conn); \ - static void write(const Store & store, typename Inner::WriteConn conn, const T & str); \ + [[nodiscard]] static WireFormatGenerator write(const Store & store, typename Inner::WriteConn conn, const T & str); \ private: \ template using S = typename Inner::template Serialise; \ } @@ -78,13 +79,13 @@ LengthPrefixedProtoHelper>::read( } template -void +WireFormatGenerator LengthPrefixedProtoHelper>::write( const Store & store, typename Inner::WriteConn conn, const std::vector & resSet) { - conn.to << resSet.size(); + co_yield resSet.size(); for (auto & key : resSet) { - S::write(store, conn, key); + co_yield S::write(store, conn, key); } } @@ -102,13 +103,13 @@ LengthPrefixedProtoHelper>::read( } template -void +WireFormatGenerator LengthPrefixedProtoHelper>::write( const Store & store, typename Inner::WriteConn conn, const std::set & resSet) { - conn.to << resSet.size(); + co_yield resSet.size(); for (auto & key : resSet) { - S::write(store, conn, key); + co_yield S::write(store, conn, key); } } @@ -128,14 +129,14 @@ LengthPrefixedProtoHelper>::read( } template -void +WireFormatGenerator LengthPrefixedProtoHelper>::write( const Store & store, typename Inner::WriteConn conn, const std::map & resMap) { - conn.to << resMap.size(); + co_yield resMap.size(); for (auto & i : resMap) { - S::write(store, conn, i.first); - S::write(store, conn, i.second); + co_yield S::write(store, conn, i.first); + co_yield S::write(store, conn, i.second); } } @@ -150,13 +151,24 @@ LengthPrefixedProtoHelper>::read( } template -void +WireFormatGenerator LengthPrefixedProtoHelper>::write( const Store & store, typename Inner::WriteConn conn, const std::tuple & res) { - std::apply([&](const Us &... args) { - (S::write(store, conn, args), ...); - }, res); + auto fullArgs = std::apply( + [&](auto &... rest) { + return std::tuple( + std::cref(store), conn, rest... + ); + }, + res + ); + return std::apply( + [](auto & store, auto conn, const Us &... args) -> WireFormatGenerator { + (co_yield S::write(store, conn, args), ...); + }, + fullArgs + ); } } diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc index 9ff01351c..bf2e60918 100644 --- a/src/libstore/remote-store.cc +++ b/src/libstore/remote-store.cc @@ -203,7 +203,7 @@ StorePathSet RemoteStore::queryValidPaths(const StorePathSet & paths, Substitute { auto conn(getConnection()); conn->to << WorkerProto::Op::QueryValidPaths; - WorkerProto::write(*this, *conn, paths); + conn->to << WorkerProto::write(*this, *conn, paths); if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 27) { conn->to << maybeSubstitute; } @@ -225,7 +225,7 @@ StorePathSet RemoteStore::querySubstitutablePaths(const StorePathSet & paths) { auto conn(getConnection()); conn->to << WorkerProto::Op::QuerySubstitutablePaths; - WorkerProto::write(*this, *conn, paths); + conn->to << WorkerProto::write(*this, *conn, paths); conn.processStderr(); return WorkerProto::Serialise::read(*this, *conn); } @@ -243,9 +243,9 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathCAMap & pathsMap, S StorePathSet paths; for (auto & path : pathsMap) paths.insert(path.first); - WorkerProto::write(*this, *conn, paths); + conn->to << WorkerProto::write(*this, *conn, paths); } else - WorkerProto::write(*this, *conn, pathsMap); + conn->to << WorkerProto::write(*this, *conn, pathsMap); conn.processStderr(); size_t count = readNum(conn->from); for (size_t n = 0; n < count; n++) { @@ -377,7 +377,7 @@ ref RemoteStore::addCAToStore( << WorkerProto::Op::AddToStore << name << caMethod.render(hashType); - WorkerProto::write(*this, *conn, references); + conn->to << WorkerProto::write(*this, *conn, references); conn->to << repair; // The dump source may invoke the store, so we need to make some room. @@ -402,7 +402,7 @@ ref RemoteStore::addCAToStore( name, printHashType(hashType)); std::string s = dump.drain(); conn->to << WorkerProto::Op::AddTextToStore << name << s; - WorkerProto::write(*this, *conn, references); + conn->to << WorkerProto::write(*this, *conn, references); conn.processStderr(); }, [&](const FileIngestionMethod & fim) -> void { @@ -462,7 +462,7 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source, << printStorePath(info.path) << (info.deriver ? printStorePath(*info.deriver) : "") << info.narHash.to_string(Base16, false); - WorkerProto::write(*this, *conn, info.references); + conn->to << WorkerProto::write(*this, *conn, info.references); conn->to << info.registrationTime << info.narSize << info.ultimate << info.sigs << renderContentAddress(info.ca) << repair << !checkSigs; @@ -488,7 +488,7 @@ void RemoteStore::addMultipleToStore( auto source = sinkToSource([&](Sink & sink) { sink << pathsToCopy.size(); for (auto & [pathInfo, pathSource] : pathsToCopy) { - WorkerProto::Serialise::write(*this, + sink << WorkerProto::Serialise::write(*this, WorkerProto::WriteConn {sink, remoteVersion}, pathInfo); pathSource->drainInto(sink); @@ -536,7 +536,7 @@ void RemoteStore::registerDrvOutput(const Realisation & info) conn->to << info.id.to_string(); conn->to << std::string(info.outPath.to_string()); } else { - WorkerProto::write(*this, *conn, info); + conn->to << WorkerProto::write(*this, *conn, info); } conn.processStderr(); } @@ -597,7 +597,7 @@ void RemoteStore::buildPaths(const std::vector & drvPaths, BuildMod auto conn(getConnection()); conn->to << WorkerProto::Op::BuildPaths; - WorkerProto::write(*this, *conn, drvPaths); + conn->to << WorkerProto::write(*this, *conn, drvPaths); conn->to << buildMode; conn.processStderr(); readInt(conn->from); @@ -615,7 +615,7 @@ std::vector RemoteStore::buildPathsWithResults( if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 34) { conn->to << WorkerProto::Op::BuildPathsWithResults; - WorkerProto::write(*this, *conn, paths); + conn->to << WorkerProto::write(*this, *conn, paths); conn->to << buildMode; conn.processStderr(); return WorkerProto::Serialise>::read(*this, *conn); @@ -740,7 +740,7 @@ void RemoteStore::collectGarbage(const GCOptions & options, GCResults & results) conn->to << WorkerProto::Op::CollectGarbage << options.action; - WorkerProto::write(*this, *conn, options.pathsToDelete); + conn->to << WorkerProto::write(*this, *conn, options.pathsToDelete); conn->to << options.ignoreLiveness << options.maxFreed /* removed options */ @@ -792,7 +792,7 @@ void RemoteStore::queryMissing(const std::vector & targets, { auto conn(getConnection()); conn->to << WorkerProto::Op::QueryMissing; - WorkerProto::write(*this, *conn, targets); + conn->to << WorkerProto::write(*this, *conn, targets); conn.processStderr(); willBuild = WorkerProto::Serialise::read(*this, *conn); willSubstitute = WorkerProto::Serialise::read(*this, *conn); diff --git a/src/libstore/serve-protocol-impl.hh b/src/libstore/serve-protocol-impl.hh index a3ce81026..845889451 100644 --- a/src/libstore/serve-protocol-impl.hh +++ b/src/libstore/serve-protocol-impl.hh @@ -20,9 +20,9 @@ namespace nix { { \ return LengthPrefixedProtoHelper::read(store, conn); \ } \ - TEMPLATE void ServeProto::Serialise< T >::write(const Store & store, ServeProto::WriteConn conn, const T & t) \ + TEMPLATE [[nodiscard]] WireFormatGenerator ServeProto::Serialise< T >::write(const Store & store, ServeProto::WriteConn conn, const T & t) \ { \ - LengthPrefixedProtoHelper::write(store, conn, t); \ + return LengthPrefixedProtoHelper::write(store, conn, t); \ } SERVE_USE_LENGTH_PREFIX_SERIALISER(template, std::vector) @@ -46,9 +46,10 @@ struct ServeProto::Serialise return CommonProto::Serialise::read(store, CommonProto::ReadConn { .from = conn.from }); } - static void write(const Store & store, ServeProto::WriteConn conn, const T & t) + [[nodiscard]] + static WireFormatGenerator write(const Store & store, ServeProto::WriteConn conn, const T & t) { - CommonProto::Serialise::write(store, + return CommonProto::Serialise::write(store, CommonProto::WriteConn { .to = conn.to }, t); } diff --git a/src/libstore/serve-protocol.cc b/src/libstore/serve-protocol.cc index 603137c81..723a494a5 100644 --- a/src/libstore/serve-protocol.cc +++ b/src/libstore/serve-protocol.cc @@ -34,23 +34,22 @@ BuildResult ServeProto::Serialise::read(const Store & store, ServeP return status; } -void ServeProto::Serialise::write(const Store & store, ServeProto::WriteConn conn, const BuildResult & status) +WireFormatGenerator ServeProto::Serialise::write(const Store & store, ServeProto::WriteConn conn, const BuildResult & status) { - conn.to - << status.status - << status.errorMsg; + co_yield status.status; + co_yield status.errorMsg; - if (GET_PROTOCOL_MINOR(conn.version) >= 3) - conn.to - << status.timesBuilt - << status.isNonDeterministic - << status.startTime - << status.stopTime; + if (GET_PROTOCOL_MINOR(conn.version) >= 3) { + co_yield status.timesBuilt; + co_yield status.isNonDeterministic; + co_yield status.startTime; + co_yield status.stopTime; + } if (GET_PROTOCOL_MINOR(conn.version) >= 6) { DrvOutputs builtOutputs; for (auto & [output, realisation] : status.builtOutputs) builtOutputs.insert_or_assign(realisation.id, realisation); - ServeProto::write(store, conn, builtOutputs); + co_yield ServeProto::write(store, conn, builtOutputs); } } @@ -80,21 +79,19 @@ UnkeyedValidPathInfo ServeProto::Serialise::read(const Sto return info; } -void ServeProto::Serialise::write(const Store & store, WriteConn conn, const UnkeyedValidPathInfo & info) +WireFormatGenerator ServeProto::Serialise::write(const Store & store, WriteConn conn, const UnkeyedValidPathInfo & info) { - conn.to - << (info.deriver ? store.printStorePath(*info.deriver) : ""); + co_yield (info.deriver ? store.printStorePath(*info.deriver) : ""); - ServeProto::write(store, conn, info.references); + co_yield ServeProto::write(store, conn, info.references); // !!! Maybe we want compression? - conn.to - << info.narSize // downloadSize, lie a little - << info.narSize; - if (GET_PROTOCOL_MINOR(conn.version) >= 4) - conn.to - << info.narHash.to_string(Base32, true) - << renderContentAddress(info.ca) - << info.sigs; + co_yield info.narSize; // downloadSize, lie a little + co_yield info.narSize; + if (GET_PROTOCOL_MINOR(conn.version) >= 4) { + co_yield info.narHash.to_string(Base32, true); + co_yield renderContentAddress(info.ca); + co_yield info.sigs; + } } } diff --git a/src/libstore/serve-protocol.hh b/src/libstore/serve-protocol.hh index 742320933..34c591a24 100644 --- a/src/libstore/serve-protocol.hh +++ b/src/libstore/serve-protocol.hh @@ -79,7 +79,7 @@ struct ServeProto #if 0 { static T read(const Store & store, ReadConn conn); - static void write(const Store & store, WriteConn conn, const T & t); + static WireFormatGenerator write(const Store & store, WriteConn conn, const T & t); }; #endif @@ -88,9 +88,10 @@ struct ServeProto * infer the type instead of having to write it down explicitly. */ template - static void write(const Store & store, WriteConn conn, const T & t) + [[nodiscard]] + static WireFormatGenerator write(const Store & store, WriteConn conn, const T & t) { - ServeProto::Serialise::write(store, conn, t); + return ServeProto::Serialise::write(store, conn, t); } }; @@ -142,7 +143,7 @@ inline std::ostream & operator << (std::ostream & s, ServeProto::Command op) struct ServeProto::Serialise< T > \ { \ static T read(const Store & store, ServeProto::ReadConn conn); \ - static void write(const Store & store, ServeProto::WriteConn conn, const T & t); \ + [[nodiscard]] static WireFormatGenerator write(const Store & store, ServeProto::WriteConn conn, const T & t); \ }; template<> diff --git a/src/libstore/worker-protocol-impl.hh b/src/libstore/worker-protocol-impl.hh index c043588d6..17c49385b 100644 --- a/src/libstore/worker-protocol-impl.hh +++ b/src/libstore/worker-protocol-impl.hh @@ -20,9 +20,9 @@ namespace nix { { \ return LengthPrefixedProtoHelper::read(store, conn); \ } \ - TEMPLATE void WorkerProto::Serialise< T >::write(const Store & store, WorkerProto::WriteConn conn, const T & t) \ + TEMPLATE [[nodiscard]] WireFormatGenerator WorkerProto::Serialise< T >::write(const Store & store, WorkerProto::WriteConn conn, const T & t) \ { \ - LengthPrefixedProtoHelper::write(store, conn, t); \ + return LengthPrefixedProtoHelper::write(store, conn, t); \ } WORKER_USE_LENGTH_PREFIX_SERIALISER(template, std::vector) @@ -46,9 +46,10 @@ struct WorkerProto::Serialise return CommonProto::Serialise::read(store, CommonProto::ReadConn { .from = conn.from }); } - static void write(const Store & store, WorkerProto::WriteConn conn, const T & t) + [[nodiscard]] + static WireFormatGenerator write(const Store & store, WorkerProto::WriteConn conn, const T & t) { - CommonProto::Serialise::write(store, + return CommonProto::Serialise::write(store, CommonProto::WriteConn { .to = conn.to }, t); } diff --git a/src/libstore/worker-protocol.cc b/src/libstore/worker-protocol.cc index 08c5c6b70..f85fba0ff 100644 --- a/src/libstore/worker-protocol.cc +++ b/src/libstore/worker-protocol.cc @@ -28,17 +28,17 @@ std::optional WorkerProto::Serialise>::r } } -void WorkerProto::Serialise>::write(const Store & store, WorkerProto::WriteConn conn, const std::optional & optTrusted) +WireFormatGenerator WorkerProto::Serialise>::write(const Store & store, WorkerProto::WriteConn conn, const std::optional & optTrusted) { if (!optTrusted) - conn.to << (uint8_t)0; + co_yield (uint8_t)0; else { switch (*optTrusted) { case Trusted: - conn.to << (uint8_t)1; + co_yield (uint8_t)1; break; case NotTrusted: - conn.to << (uint8_t)2; + co_yield (uint8_t)2; break; default: assert(false); @@ -57,23 +57,23 @@ DerivedPath WorkerProto::Serialise::read(const Store & store, Worke } } -void WorkerProto::Serialise::write(const Store & store, WorkerProto::WriteConn conn, const DerivedPath & req) +WireFormatGenerator WorkerProto::Serialise::write(const Store & store, WorkerProto::WriteConn conn, const DerivedPath & req) { if (GET_PROTOCOL_MINOR(conn.version) >= 30) { - conn.to << req.to_string_legacy(store); + co_yield req.to_string_legacy(store); } else { auto sOrDrvPath = StorePathWithOutputs::tryFromDerivedPath(req); - std::visit(overloaded { - [&](const StorePathWithOutputs & s) { - conn.to << s.to_string(store); + co_yield std::visit(overloaded { + [&](const StorePathWithOutputs & s) -> std::string { + return s.to_string(store); }, - [&](const StorePath & drvPath) { + [&](const StorePath & drvPath) -> std::string { throw Error("trying to request '%s', but daemon protocol %d.%d is too old (< 1.29) to request a derivation file", store.printStorePath(drvPath), GET_PROTOCOL_MAJOR(conn.version), GET_PROTOCOL_MINOR(conn.version)); }, - [&](std::monostate) { + [&](std::monostate) -> std::string { throw Error("wanted to build a derivation that is itself a build product, but protocols do not support that. Try upgrading the Nix implementation on the other end of this connection"); }, }, sOrDrvPath); @@ -91,10 +91,10 @@ KeyedBuildResult WorkerProto::Serialise::read(const Store & st }; } -void WorkerProto::Serialise::write(const Store & store, WorkerProto::WriteConn conn, const KeyedBuildResult & res) +WireFormatGenerator WorkerProto::Serialise::write(const Store & store, WorkerProto::WriteConn conn, const KeyedBuildResult & res) { - WorkerProto::write(store, conn, res.path); - WorkerProto::write(store, conn, static_cast(res)); + co_yield WorkerProto::write(store, conn, res.path); + co_yield WorkerProto::write(store, conn, static_cast(res)); } @@ -120,23 +120,21 @@ BuildResult WorkerProto::Serialise::read(const Store & store, Worke return res; } -void WorkerProto::Serialise::write(const Store & store, WorkerProto::WriteConn conn, const BuildResult & res) +WireFormatGenerator WorkerProto::Serialise::write(const Store & store, WorkerProto::WriteConn conn, const BuildResult & res) { - conn.to - << res.status - << res.errorMsg; + co_yield res.status; + co_yield res.errorMsg; if (GET_PROTOCOL_MINOR(conn.version) >= 29) { - conn.to - << res.timesBuilt - << res.isNonDeterministic - << res.startTime - << res.stopTime; + co_yield res.timesBuilt; + co_yield res.isNonDeterministic; + co_yield res.startTime; + co_yield res.stopTime; } if (GET_PROTOCOL_MINOR(conn.version) >= 28) { DrvOutputs builtOutputs; for (auto & [output, realisation] : res.builtOutputs) builtOutputs.insert_or_assign(realisation.id, realisation); - WorkerProto::write(store, conn, builtOutputs); + co_yield WorkerProto::write(store, conn, builtOutputs); } } @@ -150,10 +148,10 @@ ValidPathInfo WorkerProto::Serialise::read(const Store & store, R }; } -void WorkerProto::Serialise::write(const Store & store, WriteConn conn, const ValidPathInfo & pathInfo) +WireFormatGenerator WorkerProto::Serialise::write(const Store & store, WriteConn conn, const ValidPathInfo & pathInfo) { - WorkerProto::write(store, conn, pathInfo.path); - WorkerProto::write(store, conn, static_cast(pathInfo)); + co_yield WorkerProto::write(store, conn, pathInfo.path); + co_yield WorkerProto::write(store, conn, static_cast(pathInfo)); } @@ -173,18 +171,17 @@ UnkeyedValidPathInfo WorkerProto::Serialise::read(const St return info; } -void WorkerProto::Serialise::write(const Store & store, WriteConn conn, const UnkeyedValidPathInfo & pathInfo) +WireFormatGenerator WorkerProto::Serialise::write(const Store & store, WriteConn conn, const UnkeyedValidPathInfo & pathInfo) { - conn.to - << (pathInfo.deriver ? store.printStorePath(*pathInfo.deriver) : "") - << pathInfo.narHash.to_string(Base16, false); - WorkerProto::write(store, conn, pathInfo.references); - conn.to << pathInfo.registrationTime << pathInfo.narSize; + co_yield (pathInfo.deriver ? store.printStorePath(*pathInfo.deriver) : ""); + co_yield pathInfo.narHash.to_string(Base16, false); + co_yield WorkerProto::write(store, conn, pathInfo.references); + co_yield pathInfo.registrationTime; + co_yield pathInfo.narSize; - conn.to - << pathInfo.ultimate - << pathInfo.sigs - << renderContentAddress(pathInfo.ca); + co_yield pathInfo.ultimate; + co_yield pathInfo.sigs; + co_yield renderContentAddress(pathInfo.ca); } } diff --git a/src/libstore/worker-protocol.hh b/src/libstore/worker-protocol.hh index 36acf2a8d..9fb6d63e0 100644 --- a/src/libstore/worker-protocol.hh +++ b/src/libstore/worker-protocol.hh @@ -122,7 +122,7 @@ struct WorkerProto #if 0 { static T read(const Store & store, ReadConn conn); - static void write(const Store & store, WriteConn conn, const T & t); + static WireFormatGenerator write(const Store & store, WriteConn conn, const T & t); }; #endif @@ -131,9 +131,10 @@ struct WorkerProto * infer the type instead of having to write it down explicitly. */ template - static void write(const Store & store, WriteConn conn, const T & t) + [[nodiscard]] + static WireFormatGenerator write(const Store & store, WriteConn conn, const T & t) { - WorkerProto::Serialise::write(store, conn, t); + return WorkerProto::Serialise::write(store, conn, t); } }; @@ -219,7 +220,7 @@ inline std::ostream & operator << (std::ostream & s, WorkerProto::Op op) struct WorkerProto::Serialise< T > \ { \ static T read(const Store & store, WorkerProto::ReadConn conn); \ - static void write(const Store & store, WorkerProto::WriteConn conn, const T & t); \ + [[nodiscard]] static WireFormatGenerator write(const Store & store, WorkerProto::WriteConn conn, const T & t); \ }; template<> diff --git a/src/nix-store/nix-store.cc b/src/nix-store/nix-store.cc index f11e2fe4d..a2171a237 100644 --- a/src/nix-store/nix-store.cc +++ b/src/nix-store/nix-store.cc @@ -880,7 +880,8 @@ static void opServe(Strings opFlags, Strings opArgs) store->substitutePaths(paths); } - ServeProto::write(*store, wconn, store->queryValidPaths(paths)); + auto valid = store->queryValidPaths(paths); + wconn.to << ServeProto::write(*store, wconn, valid); break; } @@ -891,7 +892,7 @@ static void opServe(Strings opFlags, Strings opArgs) try { auto info = store->queryPathInfo(i); out << store->printStorePath(info->path); - ServeProto::write(*store, wconn, static_cast(*info)); + wconn.to << ServeProto::write(*store, wconn, static_cast(*info)); } catch (InvalidPath &) { } } @@ -950,7 +951,7 @@ static void opServe(Strings opFlags, Strings opArgs) MonitorFdHup monitor(in.fd); auto status = store->buildDerivation(drvPath, drv); - ServeProto::write(*store, wconn, status); + wconn.to << ServeProto::write(*store, wconn, status); break; } @@ -959,7 +960,7 @@ static void opServe(Strings opFlags, Strings opArgs) StorePathSet closure; store->computeFSClosure(ServeProto::Serialise::read(*store, rconn), closure, false, includeOutputs); - ServeProto::write(*store, wconn, closure); + wconn.to << ServeProto::write(*store, wconn, closure); break; } diff --git a/tests/unit/libstore/common-protocol.cc b/tests/unit/libstore/common-protocol.cc index a820cb1c2..1d25f81c7 100644 --- a/tests/unit/libstore/common-protocol.cc +++ b/tests/unit/libstore/common-protocol.cc @@ -50,7 +50,7 @@ public: auto file = goldenMaster(testStem); StringSink to; - CommonProto::write( + to << CommonProto::write( *store, CommonProto::WriteConn { .to = to }, value); diff --git a/tests/unit/libstore/protocol.hh b/tests/unit/libstore/protocol.hh index f480f4ad1..269c9f7aa 100644 --- a/tests/unit/libstore/protocol.hh +++ b/tests/unit/libstore/protocol.hh @@ -56,7 +56,7 @@ public: auto file = ProtoTest::goldenMaster(testStem); StringSink to; - Proto::write( + to << Proto::write( *LibStoreTest::store, typename Proto::WriteConn {to, version}, value);