Merge pull request #8547 from obsidiansystems/proto-cleanup-prep

Make a few changes in prepartion for deeper cleanup of the remote protocols
This commit is contained in:
Robert Hensing 2023-06-19 20:56:24 +02:00 committed by GitHub
commit 3ee86307ab
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 717 additions and 408 deletions

View file

@ -106,6 +106,7 @@
- [Hacking](contributing/hacking.md) - [Hacking](contributing/hacking.md)
- [Experimental Features](contributing/experimental-features.md) - [Experimental Features](contributing/experimental-features.md)
- [CLI guideline](contributing/cli-guideline.md) - [CLI guideline](contributing/cli-guideline.md)
- [C++ style guide](contributing/cxx.md)
- [Release Notes](release-notes/release-notes.md) - [Release Notes](release-notes/release-notes.md)
- [Release X.Y (202?-??-??)](release-notes/rl-next.md) - [Release X.Y (202?-??-??)](release-notes/rl-next.md)
- [Release 2.16 (2023-05-31)](release-notes/rl-2.16.md) - [Release 2.16 (2023-05-31)](release-notes/rl-2.16.md)

View file

@ -0,0 +1,28 @@
# C++ style guide
Some miscellaneous notes on how we write C++.
Formatting we hope to eventually normalize automatically, so this section is free to just discuss higher-level concerns.
## The `*-impl.hh` pattern
Let's start with some background info first.
Headers, are supposed to contain declarations, not definitions.
This allows us to change a definition without changing the declaration, and have a very small rebuild during development.
Templates, however, need to be specialized to use-sites.
Absent fancier techniques, templates require that the definition, not just mere declaration, must be available at use-sites in order to make that specialization on the fly as part of compiling those use-sites.
Making definitions available like that means putting them in headers, but that is unfortunately means we get all the extra rebuilds we want to avoid by just putting declarations there as described above.
The `*-impl.hh` pattern is a ham-fisted partial solution to this problem.
It constitutes:
- Declaring items only in the main `foo.hh`, including templates.
- Putting template definitions in a companion `foo-impl.hh` header.
Most C++ developers would accompany this by having `foo.hh` include `foo-impl.hh`, to ensure any file getting the template declarations also got the template definitions.
But we've found not doing this has some benefits and fewer than imagined downsides.
The fact remains that headers are rarely as minimal as they could be;
there is often code that needs declarations from the headers but not the templates within them.
With our pattern where `foo.hh` doesn't include `foo-impl.hh`, that means they can just include `foo.hh`
Code that needs both just includes `foo.hh` and `foo-impl.hh`.
This does make linking error possible where something forgets to include `foo-impl.hh` that needs it, but those are build-time only as easy to fix.

View file

@ -299,7 +299,7 @@ connected:
!trusted || *trusted; !trusted || *trusted;
}); });
// See the very large comment in `case wopBuildDerivation:` in // See the very large comment in `case WorkerProto::Op::BuildDerivation:` in
// `src/libstore/daemon.cc` that explains the trust model here. // `src/libstore/daemon.cc` that explains the trust model here.
// //
// This condition mirrors that: that code enforces the "rules" outlined there; // This condition mirrors that: that code enforces the "rules" outlined there;

View file

@ -9,6 +9,7 @@
#include "archive.hh" #include "archive.hh"
#include "compression.hh" #include "compression.hh"
#include "worker-protocol.hh" #include "worker-protocol.hh"
#include "worker-protocol-impl.hh"
#include "topo-sort.hh" #include "topo-sort.hh"
#include "callback.hh" #include "callback.hh"
#include "local-store.hh" // TODO remove, along with remaining downcasts #include "local-store.hh" // TODO remove, along with remaining downcasts
@ -1150,9 +1151,11 @@ HookReply DerivationGoal::tryBuildHook()
throw; throw;
} }
WorkerProto::WriteConn conn { hook->sink };
/* Tell the hook all the inputs that have to be copied to the /* Tell the hook all the inputs that have to be copied to the
remote system. */ remote system. */
workerProtoWrite(worker.store, hook->sink, inputPaths); WorkerProto::write(worker.store, conn, inputPaths);
/* Tell the hooks the missing outputs that have to be copied back /* Tell the hooks the missing outputs that have to be copied back
from the remote system. */ from the remote system. */
@ -1163,7 +1166,7 @@ HookReply DerivationGoal::tryBuildHook()
if (buildMode != bmCheck && status.known && status.known->isValid()) continue; if (buildMode != bmCheck && status.known && status.known->isValid()) continue;
missingOutputs.insert(outputName); missingOutputs.insert(outputName);
} }
workerProtoWrite(worker.store, hook->sink, missingOutputs); WorkerProto::write(worker.store, conn, missingOutputs);
} }
hook->sink = FdSink(); hook->sink = FdSink();

View file

@ -10,7 +10,6 @@
#include "archive.hh" #include "archive.hh"
#include "compression.hh" #include "compression.hh"
#include "daemon.hh" #include "daemon.hh"
#include "worker-protocol.hh"
#include "topo-sort.hh" #include "topo-sort.hh"
#include "callback.hh" #include "callback.hh"
#include "json-utils.hh" #include "json-utils.hh"

View file

@ -1,6 +1,7 @@
#include "daemon.hh" #include "daemon.hh"
#include "monitor-fd.hh" #include "monitor-fd.hh"
#include "worker-protocol.hh" #include "worker-protocol.hh"
#include "worker-protocol-impl.hh"
#include "build-result.hh" #include "build-result.hh"
#include "store-api.hh" #include "store-api.hh"
#include "store-cast.hh" #include "store-cast.hh"
@ -259,13 +260,13 @@ struct ClientSettings
} }
}; };
static std::vector<DerivedPath> readDerivedPaths(Store & store, unsigned int clientVersion, Source & from) static std::vector<DerivedPath> readDerivedPaths(Store & store, unsigned int clientVersion, WorkerProto::ReadConn conn)
{ {
std::vector<DerivedPath> reqs; std::vector<DerivedPath> reqs;
if (GET_PROTOCOL_MINOR(clientVersion) >= 30) { if (GET_PROTOCOL_MINOR(clientVersion) >= 30) {
reqs = WorkerProto<std::vector<DerivedPath>>::read(store, from); reqs = WorkerProto::Serialise<std::vector<DerivedPath>>::read(store, conn);
} else { } else {
for (auto & s : readStrings<Strings>(from)) for (auto & s : readStrings<Strings>(conn.from))
reqs.push_back(parsePathWithOutputs(store, s).toDerivedPath()); reqs.push_back(parsePathWithOutputs(store, s).toDerivedPath());
} }
return reqs; return reqs;
@ -273,11 +274,14 @@ static std::vector<DerivedPath> readDerivedPaths(Store & store, unsigned int cli
static void performOp(TunnelLogger * logger, ref<Store> store, static void performOp(TunnelLogger * logger, ref<Store> store,
TrustedFlag trusted, RecursiveFlag recursive, unsigned int clientVersion, TrustedFlag trusted, RecursiveFlag recursive, unsigned int clientVersion,
Source & from, BufferedSink & to, unsigned int op) Source & from, BufferedSink & to, WorkerProto::Op op)
{ {
WorkerProto::ReadConn rconn { .from = from };
WorkerProto::WriteConn wconn { .to = to };
switch (op) { switch (op) {
case wopIsValidPath: { case WorkerProto::Op::IsValidPath: {
auto path = store->parseStorePath(readString(from)); auto path = store->parseStorePath(readString(from));
logger->startWork(); logger->startWork();
bool result = store->isValidPath(path); bool result = store->isValidPath(path);
@ -286,8 +290,8 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
break; break;
} }
case wopQueryValidPaths: { case WorkerProto::Op::QueryValidPaths: {
auto paths = WorkerProto<StorePathSet>::read(*store, from); auto paths = WorkerProto::Serialise<StorePathSet>::read(*store, rconn);
SubstituteFlag substitute = NoSubstitute; SubstituteFlag substitute = NoSubstitute;
if (GET_PROTOCOL_MINOR(clientVersion) >= 27) { if (GET_PROTOCOL_MINOR(clientVersion) >= 27) {
@ -300,11 +304,11 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
} }
auto res = store->queryValidPaths(paths, substitute); auto res = store->queryValidPaths(paths, substitute);
logger->stopWork(); logger->stopWork();
workerProtoWrite(*store, to, res); WorkerProto::write(*store, wconn, res);
break; break;
} }
case wopHasSubstitutes: { case WorkerProto::Op::HasSubstitutes: {
auto path = store->parseStorePath(readString(from)); auto path = store->parseStorePath(readString(from));
logger->startWork(); logger->startWork();
StorePathSet paths; // FIXME StorePathSet paths; // FIXME
@ -315,16 +319,16 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
break; break;
} }
case wopQuerySubstitutablePaths: { case WorkerProto::Op::QuerySubstitutablePaths: {
auto paths = WorkerProto<StorePathSet>::read(*store, from); auto paths = WorkerProto::Serialise<StorePathSet>::read(*store, rconn);
logger->startWork(); logger->startWork();
auto res = store->querySubstitutablePaths(paths); auto res = store->querySubstitutablePaths(paths);
logger->stopWork(); logger->stopWork();
workerProtoWrite(*store, to, res); WorkerProto::write(*store, wconn, res);
break; break;
} }
case wopQueryPathHash: { case WorkerProto::Op::QueryPathHash: {
auto path = store->parseStorePath(readString(from)); auto path = store->parseStorePath(readString(from));
logger->startWork(); logger->startWork();
auto hash = store->queryPathInfo(path)->narHash; auto hash = store->queryPathInfo(path)->narHash;
@ -333,27 +337,27 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
break; break;
} }
case wopQueryReferences: case WorkerProto::Op::QueryReferences:
case wopQueryReferrers: case WorkerProto::Op::QueryReferrers:
case wopQueryValidDerivers: case WorkerProto::Op::QueryValidDerivers:
case wopQueryDerivationOutputs: { case WorkerProto::Op::QueryDerivationOutputs: {
auto path = store->parseStorePath(readString(from)); auto path = store->parseStorePath(readString(from));
logger->startWork(); logger->startWork();
StorePathSet paths; StorePathSet paths;
if (op == wopQueryReferences) if (op == WorkerProto::Op::QueryReferences)
for (auto & i : store->queryPathInfo(path)->references) for (auto & i : store->queryPathInfo(path)->references)
paths.insert(i); paths.insert(i);
else if (op == wopQueryReferrers) else if (op == WorkerProto::Op::QueryReferrers)
store->queryReferrers(path, paths); store->queryReferrers(path, paths);
else if (op == wopQueryValidDerivers) else if (op == WorkerProto::Op::QueryValidDerivers)
paths = store->queryValidDerivers(path); paths = store->queryValidDerivers(path);
else paths = store->queryDerivationOutputs(path); else paths = store->queryDerivationOutputs(path);
logger->stopWork(); logger->stopWork();
workerProtoWrite(*store, to, paths); WorkerProto::write(*store, wconn, paths);
break; break;
} }
case wopQueryDerivationOutputNames: { case WorkerProto::Op::QueryDerivationOutputNames: {
auto path = store->parseStorePath(readString(from)); auto path = store->parseStorePath(readString(from));
logger->startWork(); logger->startWork();
auto names = store->readDerivation(path).outputNames(); auto names = store->readDerivation(path).outputNames();
@ -362,16 +366,16 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
break; break;
} }
case wopQueryDerivationOutputMap: { case WorkerProto::Op::QueryDerivationOutputMap: {
auto path = store->parseStorePath(readString(from)); auto path = store->parseStorePath(readString(from));
logger->startWork(); logger->startWork();
auto outputs = store->queryPartialDerivationOutputMap(path); auto outputs = store->queryPartialDerivationOutputMap(path);
logger->stopWork(); logger->stopWork();
workerProtoWrite(*store, to, outputs); WorkerProto::write(*store, wconn, outputs);
break; break;
} }
case wopQueryDeriver: { case WorkerProto::Op::QueryDeriver: {
auto path = store->parseStorePath(readString(from)); auto path = store->parseStorePath(readString(from));
logger->startWork(); logger->startWork();
auto info = store->queryPathInfo(path); auto info = store->queryPathInfo(path);
@ -380,7 +384,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
break; break;
} }
case wopQueryPathFromHashPart: { case WorkerProto::Op::QueryPathFromHashPart: {
auto hashPart = readString(from); auto hashPart = readString(from);
logger->startWork(); logger->startWork();
auto path = store->queryPathFromHashPart(hashPart); auto path = store->queryPathFromHashPart(hashPart);
@ -389,11 +393,11 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
break; break;
} }
case wopAddToStore: { case WorkerProto::Op::AddToStore: {
if (GET_PROTOCOL_MINOR(clientVersion) >= 25) { if (GET_PROTOCOL_MINOR(clientVersion) >= 25) {
auto name = readString(from); auto name = readString(from);
auto camStr = readString(from); auto camStr = readString(from);
auto refs = WorkerProto<StorePathSet>::read(*store, from); auto refs = WorkerProto::Serialise<StorePathSet>::read(*store, rconn);
bool repairBool; bool repairBool;
from >> repairBool; from >> repairBool;
auto repair = RepairFlag{repairBool}; auto repair = RepairFlag{repairBool};
@ -475,7 +479,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
break; break;
} }
case wopAddMultipleToStore: { case WorkerProto::Op::AddMultipleToStore: {
bool repair, dontCheckSigs; bool repair, dontCheckSigs;
from >> repair >> dontCheckSigs; from >> repair >> dontCheckSigs;
if (!trusted && dontCheckSigs) if (!trusted && dontCheckSigs)
@ -492,10 +496,10 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
break; break;
} }
case wopAddTextToStore: { case WorkerProto::Op::AddTextToStore: {
std::string suffix = readString(from); std::string suffix = readString(from);
std::string s = readString(from); std::string s = readString(from);
auto refs = WorkerProto<StorePathSet>::read(*store, from); auto refs = WorkerProto::Serialise<StorePathSet>::read(*store, rconn);
logger->startWork(); logger->startWork();
auto path = store->addTextToStore(suffix, s, refs, NoRepair); auto path = store->addTextToStore(suffix, s, refs, NoRepair);
logger->stopWork(); logger->stopWork();
@ -503,7 +507,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
break; break;
} }
case wopExportPath: { case WorkerProto::Op::ExportPath: {
auto path = store->parseStorePath(readString(from)); auto path = store->parseStorePath(readString(from));
readInt(from); // obsolete readInt(from); // obsolete
logger->startWork(); logger->startWork();
@ -514,7 +518,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
break; break;
} }
case wopImportPaths: { case WorkerProto::Op::ImportPaths: {
logger->startWork(); logger->startWork();
TunnelSource source(from, to); TunnelSource source(from, to);
auto paths = store->importPaths(source, auto paths = store->importPaths(source,
@ -526,8 +530,8 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
break; break;
} }
case wopBuildPaths: { case WorkerProto::Op::BuildPaths: {
auto drvs = readDerivedPaths(*store, clientVersion, from); auto drvs = readDerivedPaths(*store, clientVersion, rconn);
BuildMode mode = bmNormal; BuildMode mode = bmNormal;
if (GET_PROTOCOL_MINOR(clientVersion) >= 15) { if (GET_PROTOCOL_MINOR(clientVersion) >= 15) {
mode = (BuildMode) readInt(from); mode = (BuildMode) readInt(from);
@ -551,8 +555,8 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
break; break;
} }
case wopBuildPathsWithResults: { case WorkerProto::Op::BuildPathsWithResults: {
auto drvs = readDerivedPaths(*store, clientVersion, from); auto drvs = readDerivedPaths(*store, clientVersion, rconn);
BuildMode mode = bmNormal; BuildMode mode = bmNormal;
mode = (BuildMode) readInt(from); mode = (BuildMode) readInt(from);
@ -567,12 +571,12 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
auto results = store->buildPathsWithResults(drvs, mode); auto results = store->buildPathsWithResults(drvs, mode);
logger->stopWork(); logger->stopWork();
workerProtoWrite(*store, to, results); WorkerProto::write(*store, wconn, results);
break; break;
} }
case wopBuildDerivation: { case WorkerProto::Op::BuildDerivation: {
auto drvPath = store->parseStorePath(readString(from)); auto drvPath = store->parseStorePath(readString(from));
BasicDerivation drv; BasicDerivation drv;
readDerivation(from, *store, drv, Derivation::nameFromPath(drvPath)); readDerivation(from, *store, drv, Derivation::nameFromPath(drvPath));
@ -644,12 +648,12 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
DrvOutputs builtOutputs; DrvOutputs builtOutputs;
for (auto & [output, realisation] : res.builtOutputs) for (auto & [output, realisation] : res.builtOutputs)
builtOutputs.insert_or_assign(realisation.id, realisation); builtOutputs.insert_or_assign(realisation.id, realisation);
workerProtoWrite(*store, to, builtOutputs); WorkerProto::write(*store, wconn, builtOutputs);
} }
break; break;
} }
case wopEnsurePath: { case WorkerProto::Op::EnsurePath: {
auto path = store->parseStorePath(readString(from)); auto path = store->parseStorePath(readString(from));
logger->startWork(); logger->startWork();
store->ensurePath(path); store->ensurePath(path);
@ -658,7 +662,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
break; break;
} }
case wopAddTempRoot: { case WorkerProto::Op::AddTempRoot: {
auto path = store->parseStorePath(readString(from)); auto path = store->parseStorePath(readString(from));
logger->startWork(); logger->startWork();
store->addTempRoot(path); store->addTempRoot(path);
@ -667,7 +671,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
break; break;
} }
case wopAddIndirectRoot: { case WorkerProto::Op::AddIndirectRoot: {
Path path = absPath(readString(from)); Path path = absPath(readString(from));
logger->startWork(); logger->startWork();
@ -680,14 +684,14 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
} }
// Obsolete. // Obsolete.
case wopSyncWithGC: { case WorkerProto::Op::SyncWithGC: {
logger->startWork(); logger->startWork();
logger->stopWork(); logger->stopWork();
to << 1; to << 1;
break; break;
} }
case wopFindRoots: { case WorkerProto::Op::FindRoots: {
logger->startWork(); logger->startWork();
auto & gcStore = require<GcStore>(*store); auto & gcStore = require<GcStore>(*store);
Roots roots = gcStore.findRoots(!trusted); Roots roots = gcStore.findRoots(!trusted);
@ -706,10 +710,10 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
break; break;
} }
case wopCollectGarbage: { case WorkerProto::Op::CollectGarbage: {
GCOptions options; GCOptions options;
options.action = (GCOptions::GCAction) readInt(from); options.action = (GCOptions::GCAction) readInt(from);
options.pathsToDelete = WorkerProto<StorePathSet>::read(*store, from); options.pathsToDelete = WorkerProto::Serialise<StorePathSet>::read(*store, rconn);
from >> options.ignoreLiveness >> options.maxFreed; from >> options.ignoreLiveness >> options.maxFreed;
// obsolete fields // obsolete fields
readInt(from); readInt(from);
@ -730,7 +734,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
break; break;
} }
case wopSetOptions: { case WorkerProto::Op::SetOptions: {
ClientSettings clientSettings; ClientSettings clientSettings;
@ -767,7 +771,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
break; break;
} }
case wopQuerySubstitutablePathInfo: { case WorkerProto::Op::QuerySubstitutablePathInfo: {
auto path = store->parseStorePath(readString(from)); auto path = store->parseStorePath(readString(from));
logger->startWork(); logger->startWork();
SubstitutablePathInfos infos; SubstitutablePathInfos infos;
@ -779,22 +783,22 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
else { else {
to << 1 to << 1
<< (i->second.deriver ? store->printStorePath(*i->second.deriver) : ""); << (i->second.deriver ? store->printStorePath(*i->second.deriver) : "");
workerProtoWrite(*store, to, i->second.references); WorkerProto::write(*store, wconn, i->second.references);
to << i->second.downloadSize to << i->second.downloadSize
<< i->second.narSize; << i->second.narSize;
} }
break; break;
} }
case wopQuerySubstitutablePathInfos: { case WorkerProto::Op::QuerySubstitutablePathInfos: {
SubstitutablePathInfos infos; SubstitutablePathInfos infos;
StorePathCAMap pathsMap = {}; StorePathCAMap pathsMap = {};
if (GET_PROTOCOL_MINOR(clientVersion) < 22) { if (GET_PROTOCOL_MINOR(clientVersion) < 22) {
auto paths = WorkerProto<StorePathSet>::read(*store, from); auto paths = WorkerProto::Serialise<StorePathSet>::read(*store, rconn);
for (auto & path : paths) for (auto & path : paths)
pathsMap.emplace(path, std::nullopt); pathsMap.emplace(path, std::nullopt);
} else } else
pathsMap = WorkerProto<StorePathCAMap>::read(*store, from); pathsMap = WorkerProto::Serialise<StorePathCAMap>::read(*store, rconn);
logger->startWork(); logger->startWork();
store->querySubstitutablePathInfos(pathsMap, infos); store->querySubstitutablePathInfos(pathsMap, infos);
logger->stopWork(); logger->stopWork();
@ -802,21 +806,21 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
for (auto & i : infos) { for (auto & i : infos) {
to << store->printStorePath(i.first) to << store->printStorePath(i.first)
<< (i.second.deriver ? store->printStorePath(*i.second.deriver) : ""); << (i.second.deriver ? store->printStorePath(*i.second.deriver) : "");
workerProtoWrite(*store, to, i.second.references); WorkerProto::write(*store, wconn, i.second.references);
to << i.second.downloadSize << i.second.narSize; to << i.second.downloadSize << i.second.narSize;
} }
break; break;
} }
case wopQueryAllValidPaths: { case WorkerProto::Op::QueryAllValidPaths: {
logger->startWork(); logger->startWork();
auto paths = store->queryAllValidPaths(); auto paths = store->queryAllValidPaths();
logger->stopWork(); logger->stopWork();
workerProtoWrite(*store, to, paths); WorkerProto::write(*store, wconn, paths);
break; break;
} }
case wopQueryPathInfo: { case WorkerProto::Op::QueryPathInfo: {
auto path = store->parseStorePath(readString(from)); auto path = store->parseStorePath(readString(from));
std::shared_ptr<const ValidPathInfo> info; std::shared_ptr<const ValidPathInfo> info;
logger->startWork(); logger->startWork();
@ -837,14 +841,14 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
break; break;
} }
case wopOptimiseStore: case WorkerProto::Op::OptimiseStore:
logger->startWork(); logger->startWork();
store->optimiseStore(); store->optimiseStore();
logger->stopWork(); logger->stopWork();
to << 1; to << 1;
break; break;
case wopVerifyStore: { case WorkerProto::Op::VerifyStore: {
bool checkContents, repair; bool checkContents, repair;
from >> checkContents >> repair; from >> checkContents >> repair;
logger->startWork(); logger->startWork();
@ -856,7 +860,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
break; break;
} }
case wopAddSignatures: { case WorkerProto::Op::AddSignatures: {
auto path = store->parseStorePath(readString(from)); auto path = store->parseStorePath(readString(from));
StringSet sigs = readStrings<StringSet>(from); StringSet sigs = readStrings<StringSet>(from);
logger->startWork(); logger->startWork();
@ -868,7 +872,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
break; break;
} }
case wopNarFromPath: { case WorkerProto::Op::NarFromPath: {
auto path = store->parseStorePath(readString(from)); auto path = store->parseStorePath(readString(from));
logger->startWork(); logger->startWork();
logger->stopWork(); logger->stopWork();
@ -876,7 +880,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
break; break;
} }
case wopAddToStoreNar: { case WorkerProto::Op::AddToStoreNar: {
bool repair, dontCheckSigs; bool repair, dontCheckSigs;
auto path = store->parseStorePath(readString(from)); auto path = store->parseStorePath(readString(from));
auto deriver = readString(from); auto deriver = readString(from);
@ -884,7 +888,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
ValidPathInfo info { path, narHash }; ValidPathInfo info { path, narHash };
if (deriver != "") if (deriver != "")
info.deriver = store->parseStorePath(deriver); info.deriver = store->parseStorePath(deriver);
info.references = WorkerProto<StorePathSet>::read(*store, from); info.references = WorkerProto::Serialise<StorePathSet>::read(*store, rconn);
from >> info.registrationTime >> info.narSize >> info.ultimate; from >> info.registrationTime >> info.narSize >> info.ultimate;
info.sigs = readStrings<StringSet>(from); info.sigs = readStrings<StringSet>(from);
info.ca = ContentAddress::parseOpt(readString(from)); info.ca = ContentAddress::parseOpt(readString(from));
@ -928,21 +932,21 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
break; break;
} }
case wopQueryMissing: { case WorkerProto::Op::QueryMissing: {
auto targets = readDerivedPaths(*store, clientVersion, from); auto targets = readDerivedPaths(*store, clientVersion, rconn);
logger->startWork(); logger->startWork();
StorePathSet willBuild, willSubstitute, unknown; StorePathSet willBuild, willSubstitute, unknown;
uint64_t downloadSize, narSize; uint64_t downloadSize, narSize;
store->queryMissing(targets, willBuild, willSubstitute, unknown, downloadSize, narSize); store->queryMissing(targets, willBuild, willSubstitute, unknown, downloadSize, narSize);
logger->stopWork(); logger->stopWork();
workerProtoWrite(*store, to, willBuild); WorkerProto::write(*store, wconn, willBuild);
workerProtoWrite(*store, to, willSubstitute); WorkerProto::write(*store, wconn, willSubstitute);
workerProtoWrite(*store, to, unknown); WorkerProto::write(*store, wconn, unknown);
to << downloadSize << narSize; to << downloadSize << narSize;
break; break;
} }
case wopRegisterDrvOutput: { case WorkerProto::Op::RegisterDrvOutput: {
logger->startWork(); logger->startWork();
if (GET_PROTOCOL_MINOR(clientVersion) < 31) { if (GET_PROTOCOL_MINOR(clientVersion) < 31) {
auto outputId = DrvOutput::parse(readString(from)); auto outputId = DrvOutput::parse(readString(from));
@ -950,14 +954,14 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
store->registerDrvOutput(Realisation{ store->registerDrvOutput(Realisation{
.id = outputId, .outPath = outputPath}); .id = outputId, .outPath = outputPath});
} else { } else {
auto realisation = WorkerProto<Realisation>::read(*store, from); auto realisation = WorkerProto::Serialise<Realisation>::read(*store, rconn);
store->registerDrvOutput(realisation); store->registerDrvOutput(realisation);
} }
logger->stopWork(); logger->stopWork();
break; break;
} }
case wopQueryRealisation: { case WorkerProto::Op::QueryRealisation: {
logger->startWork(); logger->startWork();
auto outputId = DrvOutput::parse(readString(from)); auto outputId = DrvOutput::parse(readString(from));
auto info = store->queryRealisation(outputId); auto info = store->queryRealisation(outputId);
@ -965,16 +969,16 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
if (GET_PROTOCOL_MINOR(clientVersion) < 31) { if (GET_PROTOCOL_MINOR(clientVersion) < 31) {
std::set<StorePath> outPaths; std::set<StorePath> outPaths;
if (info) outPaths.insert(info->outPath); if (info) outPaths.insert(info->outPath);
workerProtoWrite(*store, to, outPaths); WorkerProto::write(*store, wconn, outPaths);
} else { } else {
std::set<Realisation> realisations; std::set<Realisation> realisations;
if (info) realisations.insert(*info); if (info) realisations.insert(*info);
workerProtoWrite(*store, to, realisations); WorkerProto::write(*store, wconn, realisations);
} }
break; break;
} }
case wopAddBuildLog: { case WorkerProto::Op::AddBuildLog: {
StorePath path{readString(from)}; StorePath path{readString(from)};
logger->startWork(); logger->startWork();
if (!trusted) if (!trusted)
@ -991,6 +995,10 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
break; break;
} }
case WorkerProto::Op::QueryFailedPaths:
case WorkerProto::Op::ClearFailedPaths:
throw Error("Removed operation %1%", op);
default: default:
throw Error("invalid operation %1%", op); throw Error("invalid operation %1%", op);
} }
@ -1045,7 +1053,8 @@ void processConnection(
auto temp = trusted auto temp = trusted
? store->isTrustedClient() ? store->isTrustedClient()
: std::optional { NotTrusted }; : std::optional { NotTrusted };
workerProtoWrite(*store, to, temp); WorkerProto::WriteConn wconn { .to = to };
WorkerProto::write(*store, wconn, temp);
} }
/* Send startup error messages to the client. */ /* Send startup error messages to the client. */
@ -1058,9 +1067,9 @@ void processConnection(
/* Process client requests. */ /* Process client requests. */
while (true) { while (true) {
WorkerOp op; WorkerProto::Op op;
try { try {
op = (WorkerOp) readInt(from); op = (enum WorkerProto::Op) readInt(from);
} catch (Interrupted & e) { } catch (Interrupted & e) {
break; break;
} catch (EndOfFile & e) { } catch (EndOfFile & e) {

View file

@ -5,6 +5,7 @@
#include "util.hh" #include "util.hh"
#include "split.hh" #include "split.hh"
#include "worker-protocol.hh" #include "worker-protocol.hh"
#include "worker-protocol-impl.hh"
#include "fs-accessor.hh" #include "fs-accessor.hh"
#include <boost/container/small_vector.hpp> #include <boost/container/small_vector.hpp>
#include <nlohmann/json.hpp> #include <nlohmann/json.hpp>
@ -749,7 +750,8 @@ Source & readDerivation(Source & in, const Store & store, BasicDerivation & drv,
drv.outputs.emplace(std::move(name), std::move(output)); drv.outputs.emplace(std::move(name), std::move(output));
} }
drv.inputSrcs = WorkerProto<StorePathSet>::read(store, in); drv.inputSrcs = WorkerProto::Serialise<StorePathSet>::read(store,
WorkerProto::ReadConn { .from = in });
in >> drv.platform >> drv.builder; in >> drv.platform >> drv.builder;
drv.args = readStrings<Strings>(in); drv.args = readStrings<Strings>(in);
@ -797,7 +799,9 @@ void writeDerivation(Sink & out, const Store & store, const BasicDerivation & dr
}, },
}, i.second.raw()); }, i.second.raw());
} }
workerProtoWrite(store, out, drv.inputSrcs); WorkerProto::write(store,
WorkerProto::WriteConn { .to = out },
drv.inputSrcs);
out << drv.platform << drv.builder << drv.args; out << drv.platform << drv.builder << drv.args;
out << drv.env.size(); out << drv.env.size();
for (auto & i : drv.env) for (auto & i : drv.env)

View file

@ -2,6 +2,7 @@
#include "store-api.hh" #include "store-api.hh"
#include "archive.hh" #include "archive.hh"
#include "worker-protocol.hh" #include "worker-protocol.hh"
#include "worker-protocol-impl.hh"
#include <algorithm> #include <algorithm>
@ -45,7 +46,9 @@ void Store::exportPath(const StorePath & path, Sink & sink)
teeSink teeSink
<< exportMagic << exportMagic
<< printStorePath(path); << printStorePath(path);
workerProtoWrite(*this, teeSink, info->references); WorkerProto::write(*this,
WorkerProto::WriteConn { .to = teeSink },
info->references);
teeSink teeSink
<< (info->deriver ? printStorePath(*info->deriver) : "") << (info->deriver ? printStorePath(*info->deriver) : "")
<< 0; << 0;
@ -73,7 +76,8 @@ StorePaths Store::importPaths(Source & source, CheckSigsFlag checkSigs)
//Activity act(*logger, lvlInfo, "importing path '%s'", info.path); //Activity act(*logger, lvlInfo, "importing path '%s'", info.path);
auto references = WorkerProto<StorePathSet>::read(*this, source); auto references = WorkerProto::Serialise<StorePathSet>::read(*this,
WorkerProto::ReadConn { .from = source });
auto deriver = readString(source); auto deriver = readString(source);
auto narHash = hashString(htSHA256, saved.s); auto narHash = hashString(htSHA256, saved.s);

View file

@ -7,6 +7,7 @@
#include "store-api.hh" #include "store-api.hh"
#include "path-with-outputs.hh" #include "path-with-outputs.hh"
#include "worker-protocol.hh" #include "worker-protocol.hh"
#include "worker-protocol-impl.hh"
#include "ssh.hh" #include "ssh.hh"
#include "derivations.hh" #include "derivations.hh"
#include "callback.hh" #include "callback.hh"
@ -47,6 +48,42 @@ struct LegacySSHStore : public virtual LegacySSHStoreConfig, public virtual Stor
FdSource from; FdSource from;
int remoteVersion; int remoteVersion;
bool good = true; bool good = true;
/**
* Coercion to `WorkerProto::ReadConn`. This makes it easy to use the
* factored out worker protocol searlizers with a
* `LegacySSHStore::Connection`.
*
* The worker protocol connection types are unidirectional, unlike
* this type.
*
* @todo Use server protocol serializers, not worker protocol
* serializers, once we have made that distiction.
*/
operator WorkerProto::ReadConn ()
{
return WorkerProto::ReadConn {
.from = from,
};
}
/*
* Coercion to `WorkerProto::WriteConn`. This makes it easy to use the
* factored out worker protocol searlizers with a
* `LegacySSHStore::Connection`.
*
* The worker protocol connection types are unidirectional, unlike
* this type.
*
* @todo Use server protocol serializers, not worker protocol
* serializers, once we have made that distiction.
*/
operator WorkerProto::WriteConn ()
{
return WorkerProto::WriteConn {
.to = to,
};
}
}; };
std::string host; std::string host;
@ -133,7 +170,7 @@ struct LegacySSHStore : public virtual LegacySSHStoreConfig, public virtual Stor
debug("querying remote host '%s' for info on '%s'", host, printStorePath(path)); debug("querying remote host '%s' for info on '%s'", host, printStorePath(path));
conn->to << cmdQueryPathInfos << PathSet{printStorePath(path)}; conn->to << ServeProto::Command::QueryPathInfos << PathSet{printStorePath(path)};
conn->to.flush(); conn->to.flush();
auto p = readString(conn->from); auto p = readString(conn->from);
@ -146,7 +183,7 @@ struct LegacySSHStore : public virtual LegacySSHStoreConfig, public virtual Stor
auto deriver = readString(conn->from); auto deriver = readString(conn->from);
if (deriver != "") if (deriver != "")
info->deriver = parseStorePath(deriver); info->deriver = parseStorePath(deriver);
info->references = WorkerProto<StorePathSet>::read(*this, conn->from); info->references = WorkerProto::Serialise<StorePathSet>::read(*this, *conn);
readLongLong(conn->from); // download size readLongLong(conn->from); // download size
info->narSize = readLongLong(conn->from); info->narSize = readLongLong(conn->from);
@ -176,11 +213,11 @@ struct LegacySSHStore : public virtual LegacySSHStoreConfig, public virtual Stor
if (GET_PROTOCOL_MINOR(conn->remoteVersion) >= 5) { if (GET_PROTOCOL_MINOR(conn->remoteVersion) >= 5) {
conn->to conn->to
<< cmdAddToStoreNar << ServeProto::Command::AddToStoreNar
<< printStorePath(info.path) << printStorePath(info.path)
<< (info.deriver ? printStorePath(*info.deriver) : "") << (info.deriver ? printStorePath(*info.deriver) : "")
<< info.narHash.to_string(Base16, false); << info.narHash.to_string(Base16, false);
workerProtoWrite(*this, conn->to, info.references); WorkerProto::write(*this, *conn, info.references);
conn->to conn->to
<< info.registrationTime << info.registrationTime
<< info.narSize << info.narSize
@ -198,7 +235,7 @@ struct LegacySSHStore : public virtual LegacySSHStoreConfig, public virtual Stor
} else { } else {
conn->to conn->to
<< cmdImportPaths << ServeProto::Command::ImportPaths
<< 1; << 1;
try { try {
copyNAR(source, conn->to); copyNAR(source, conn->to);
@ -209,7 +246,7 @@ struct LegacySSHStore : public virtual LegacySSHStoreConfig, public virtual Stor
conn->to conn->to
<< exportMagic << exportMagic
<< printStorePath(info.path); << printStorePath(info.path);
workerProtoWrite(*this, conn->to, info.references); WorkerProto::write(*this, *conn, info.references);
conn->to conn->to
<< (info.deriver ? printStorePath(*info.deriver) : "") << (info.deriver ? printStorePath(*info.deriver) : "")
<< 0 << 0
@ -226,7 +263,7 @@ struct LegacySSHStore : public virtual LegacySSHStoreConfig, public virtual Stor
{ {
auto conn(connections->get()); auto conn(connections->get());
conn->to << cmdDumpStorePath << printStorePath(path); conn->to << ServeProto::Command::DumpStorePath << printStorePath(path);
conn->to.flush(); conn->to.flush();
copyNAR(conn->from, sink); copyNAR(conn->from, sink);
} }
@ -279,7 +316,7 @@ public:
auto conn(connections->get()); auto conn(connections->get());
conn->to conn->to
<< cmdBuildDerivation << ServeProto::Command::BuildDerivation
<< printStorePath(drvPath); << printStorePath(drvPath);
writeDerivation(conn->to, *this, drv); writeDerivation(conn->to, *this, drv);
@ -294,7 +331,7 @@ public:
if (GET_PROTOCOL_MINOR(conn->remoteVersion) >= 3) if (GET_PROTOCOL_MINOR(conn->remoteVersion) >= 3)
conn->from >> status.timesBuilt >> status.isNonDeterministic >> status.startTime >> status.stopTime; conn->from >> status.timesBuilt >> status.isNonDeterministic >> status.startTime >> status.stopTime;
if (GET_PROTOCOL_MINOR(conn->remoteVersion) >= 6) { if (GET_PROTOCOL_MINOR(conn->remoteVersion) >= 6) {
auto builtOutputs = WorkerProto<DrvOutputs>::read(*this, conn->from); auto builtOutputs = WorkerProto::Serialise<DrvOutputs>::read(*this, *conn);
for (auto && [output, realisation] : builtOutputs) for (auto && [output, realisation] : builtOutputs)
status.builtOutputs.insert_or_assign( status.builtOutputs.insert_or_assign(
std::move(output.outputName), std::move(output.outputName),
@ -310,7 +347,7 @@ public:
auto conn(connections->get()); auto conn(connections->get());
conn->to << cmdBuildPaths; conn->to << ServeProto::Command::BuildPaths;
Strings ss; Strings ss;
for (auto & p : drvPaths) { for (auto & p : drvPaths) {
auto sOrDrvPath = StorePathWithOutputs::tryFromDerivedPath(p); auto sOrDrvPath = StorePathWithOutputs::tryFromDerivedPath(p);
@ -367,12 +404,12 @@ public:
auto conn(connections->get()); auto conn(connections->get());
conn->to conn->to
<< cmdQueryClosure << ServeProto::Command::QueryClosure
<< includeOutputs; << includeOutputs;
workerProtoWrite(*this, conn->to, paths); WorkerProto::write(*this, *conn, paths);
conn->to.flush(); conn->to.flush();
for (auto & i : WorkerProto<StorePathSet>::read(*this, conn->from)) for (auto & i : WorkerProto::Serialise<StorePathSet>::read(*this, *conn))
out.insert(i); out.insert(i);
} }
@ -382,13 +419,13 @@ public:
auto conn(connections->get()); auto conn(connections->get());
conn->to conn->to
<< cmdQueryValidPaths << ServeProto::Command::QueryValidPaths
<< false // lock << false // lock
<< maybeSubstitute; << maybeSubstitute;
workerProtoWrite(*this, conn->to, paths); WorkerProto::write(*this, *conn, paths);
conn->to.flush(); conn->to.flush();
return WorkerProto<StorePathSet>::read(*this, conn->from); return WorkerProto::Serialise<StorePathSet>::read(*this, *conn);
} }
void connect() override void connect() override

View file

@ -1,5 +1,6 @@
#include "path-info.hh" #include "path-info.hh"
#include "worker-protocol.hh" #include "worker-protocol.hh"
#include "worker-protocol-impl.hh"
#include "store-api.hh" #include "store-api.hh"
namespace nix { namespace nix {
@ -132,7 +133,8 @@ ValidPathInfo ValidPathInfo::read(Source & source, const Store & store, unsigned
auto narHash = Hash::parseAny(readString(source), htSHA256); auto narHash = Hash::parseAny(readString(source), htSHA256);
ValidPathInfo info(path, narHash); ValidPathInfo info(path, narHash);
if (deriver != "") info.deriver = store.parseStorePath(deriver); if (deriver != "") info.deriver = store.parseStorePath(deriver);
info.references = WorkerProto<StorePathSet>::read(store, source); info.references = WorkerProto::Serialise<StorePathSet>::read(store,
WorkerProto::ReadConn { .from = source });
source >> info.registrationTime >> info.narSize; source >> info.registrationTime >> info.narSize;
if (format >= 16) { if (format >= 16) {
source >> info.ultimate; source >> info.ultimate;
@ -153,7 +155,9 @@ void ValidPathInfo::write(
sink << store.printStorePath(path); sink << store.printStorePath(path);
sink << (deriver ? store.printStorePath(*deriver) : "") sink << (deriver ? store.printStorePath(*deriver) : "")
<< narHash.to_string(Base16, false); << narHash.to_string(Base16, false);
workerProtoWrite(store, sink, references); WorkerProto::write(store,
WorkerProto::WriteConn { .to = sink },
references);
sink << registrationTime << narSize; sink << registrationTime << narSize;
if (format >= 16) { if (format >= 16) {
sink << ultimate sink << ultimate

View file

@ -0,0 +1,97 @@
#include "remote-store.hh"
#include "worker-protocol.hh"
namespace nix {
/**
* Bidirectional connection (send and receive) used by the Remote Store
* implementation.
*
* Contains `Source` and `Sink` for actual communication, along with
* other information learned when negotiating the connection.
*/
struct RemoteStore::Connection
{
/**
* Send with this.
*/
FdSink to;
/**
* Receive with this.
*/
FdSource from;
/**
* Worker protocol version used for the connection.
*
* Despite its name, I think it is actually the maximum version both
* sides support. (If the maximum doesn't exist, we would fail to
* establish a connection and produce a value of this type.)
*/
unsigned int daemonVersion;
/**
* Whether the remote side trusts us or not.
*
* 3 values: "yes", "no", or `std::nullopt` for "unknown".
*
* Note that the "remote side" might not be just the end daemon, but
* also an intermediary forwarder that can make its own trusting
* decisions. This would be the intersection of all their trust
* decisions, since it takes only one link in the chain to start
* denying operations.
*/
std::optional<TrustedFlag> remoteTrustsUs;
/**
* The version of the Nix daemon that is processing our requests.
*
* Do note, it may or may not communicating with another daemon,
* rather than being an "end" `LocalStore` or similar.
*/
std::optional<std::string> daemonNixVersion;
/**
* Time this connection was established.
*/
std::chrono::time_point<std::chrono::steady_clock> startTime;
/**
* Coercion to `WorkerProto::ReadConn`. This makes it easy to use the
* factored out worker protocol searlizers with a
* `RemoteStore::Connection`.
*
* The worker protocol connection types are unidirectional, unlike
* this type.
*/
operator WorkerProto::ReadConn ()
{
return WorkerProto::ReadConn {
.from = from,
};
}
/**
* Coercion to `WorkerProto::WriteConn`. This makes it easy to use the
* factored out worker protocol searlizers with a
* `RemoteStore::Connection`.
*
* The worker protocol connection types are unidirectional, unlike
* this type.
*/
operator WorkerProto::WriteConn ()
{
return WorkerProto::WriteConn {
.to = to,
};
}
virtual ~Connection();
virtual void closeWrite() = 0;
std::exception_ptr processStderr(Sink * sink = 0, Source * source = 0, bool flush = true);
};
}

View file

@ -5,7 +5,9 @@
#include "remote-fs-accessor.hh" #include "remote-fs-accessor.hh"
#include "build-result.hh" #include "build-result.hh"
#include "remote-store.hh" #include "remote-store.hh"
#include "remote-store-connection.hh"
#include "worker-protocol.hh" #include "worker-protocol.hh"
#include "worker-protocol-impl.hh"
#include "archive.hh" #include "archive.hh"
#include "globals.hh" #include "globals.hh"
#include "derivations.hh" #include "derivations.hh"
@ -100,7 +102,7 @@ void RemoteStore::initConnection(Connection & conn)
} }
if (GET_PROTOCOL_MINOR(conn.daemonVersion) >= 35) { if (GET_PROTOCOL_MINOR(conn.daemonVersion) >= 35) {
conn.remoteTrustsUs = WorkerProto<std::optional<TrustedFlag>>::read(*this, conn.from); conn.remoteTrustsUs = WorkerProto::Serialise<std::optional<TrustedFlag>>::read(*this, conn);
} else { } else {
// We don't know the answer; protocol to old. // We don't know the answer; protocol to old.
conn.remoteTrustsUs = std::nullopt; conn.remoteTrustsUs = std::nullopt;
@ -119,7 +121,7 @@ void RemoteStore::initConnection(Connection & conn)
void RemoteStore::setOptions(Connection & conn) void RemoteStore::setOptions(Connection & conn)
{ {
conn.to << wopSetOptions conn.to << WorkerProto::Op::SetOptions
<< settings.keepFailed << settings.keepFailed
<< settings.keepGoing << settings.keepGoing
<< settings.tryFallback << settings.tryFallback
@ -184,6 +186,7 @@ struct ConnectionHandle
} }
RemoteStore::Connection * operator -> () { return &*handle; } RemoteStore::Connection * operator -> () { return &*handle; }
RemoteStore::Connection & operator * () { return *handle; }
void processStderr(Sink * sink = 0, Source * source = 0, bool flush = true) void processStderr(Sink * sink = 0, Source * source = 0, bool flush = true)
{ {
@ -211,7 +214,7 @@ void RemoteStore::setOptions()
bool RemoteStore::isValidPathUncached(const StorePath & path) bool RemoteStore::isValidPathUncached(const StorePath & path)
{ {
auto conn(getConnection()); auto conn(getConnection());
conn->to << wopIsValidPath << printStorePath(path); conn->to << WorkerProto::Op::IsValidPath << printStorePath(path);
conn.processStderr(); conn.processStderr();
return readInt(conn->from); return readInt(conn->from);
} }
@ -226,13 +229,13 @@ StorePathSet RemoteStore::queryValidPaths(const StorePathSet & paths, Substitute
if (isValidPath(i)) res.insert(i); if (isValidPath(i)) res.insert(i);
return res; return res;
} else { } else {
conn->to << wopQueryValidPaths; conn->to << WorkerProto::Op::QueryValidPaths;
workerProtoWrite(*this, conn->to, paths); WorkerProto::write(*this, *conn, paths);
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 27) { if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 27) {
conn->to << (settings.buildersUseSubstitutes ? 1 : 0); conn->to << (settings.buildersUseSubstitutes ? 1 : 0);
} }
conn.processStderr(); conn.processStderr();
return WorkerProto<StorePathSet>::read(*this, conn->from); return WorkerProto::Serialise<StorePathSet>::read(*this, *conn);
} }
} }
@ -240,9 +243,9 @@ StorePathSet RemoteStore::queryValidPaths(const StorePathSet & paths, Substitute
StorePathSet RemoteStore::queryAllValidPaths() StorePathSet RemoteStore::queryAllValidPaths()
{ {
auto conn(getConnection()); auto conn(getConnection());
conn->to << wopQueryAllValidPaths; conn->to << WorkerProto::Op::QueryAllValidPaths;
conn.processStderr(); conn.processStderr();
return WorkerProto<StorePathSet>::read(*this, conn->from); return WorkerProto::Serialise<StorePathSet>::read(*this, *conn);
} }
@ -252,16 +255,16 @@ StorePathSet RemoteStore::querySubstitutablePaths(const StorePathSet & paths)
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) { if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) {
StorePathSet res; StorePathSet res;
for (auto & i : paths) { for (auto & i : paths) {
conn->to << wopHasSubstitutes << printStorePath(i); conn->to << WorkerProto::Op::HasSubstitutes << printStorePath(i);
conn.processStderr(); conn.processStderr();
if (readInt(conn->from)) res.insert(i); if (readInt(conn->from)) res.insert(i);
} }
return res; return res;
} else { } else {
conn->to << wopQuerySubstitutablePaths; conn->to << WorkerProto::Op::QuerySubstitutablePaths;
workerProtoWrite(*this, conn->to, paths); WorkerProto::write(*this, *conn, paths);
conn.processStderr(); conn.processStderr();
return WorkerProto<StorePathSet>::read(*this, conn->from); return WorkerProto::Serialise<StorePathSet>::read(*this, *conn);
} }
} }
@ -276,14 +279,14 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathCAMap & pathsMap, S
for (auto & i : pathsMap) { for (auto & i : pathsMap) {
SubstitutablePathInfo info; SubstitutablePathInfo info;
conn->to << wopQuerySubstitutablePathInfo << printStorePath(i.first); conn->to << WorkerProto::Op::QuerySubstitutablePathInfo << printStorePath(i.first);
conn.processStderr(); conn.processStderr();
unsigned int reply = readInt(conn->from); unsigned int reply = readInt(conn->from);
if (reply == 0) continue; if (reply == 0) continue;
auto deriver = readString(conn->from); auto deriver = readString(conn->from);
if (deriver != "") if (deriver != "")
info.deriver = parseStorePath(deriver); info.deriver = parseStorePath(deriver);
info.references = WorkerProto<StorePathSet>::read(*this, conn->from); info.references = WorkerProto::Serialise<StorePathSet>::read(*this, *conn);
info.downloadSize = readLongLong(conn->from); info.downloadSize = readLongLong(conn->from);
info.narSize = readLongLong(conn->from); info.narSize = readLongLong(conn->from);
infos.insert_or_assign(i.first, std::move(info)); infos.insert_or_assign(i.first, std::move(info));
@ -291,14 +294,14 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathCAMap & pathsMap, S
} else { } else {
conn->to << wopQuerySubstitutablePathInfos; conn->to << WorkerProto::Op::QuerySubstitutablePathInfos;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 22) { if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 22) {
StorePathSet paths; StorePathSet paths;
for (auto & path : pathsMap) for (auto & path : pathsMap)
paths.insert(path.first); paths.insert(path.first);
workerProtoWrite(*this, conn->to, paths); WorkerProto::write(*this, *conn, paths);
} else } else
workerProtoWrite(*this, conn->to, pathsMap); WorkerProto::write(*this, *conn, pathsMap);
conn.processStderr(); conn.processStderr();
size_t count = readNum<size_t>(conn->from); size_t count = readNum<size_t>(conn->from);
for (size_t n = 0; n < count; n++) { for (size_t n = 0; n < count; n++) {
@ -306,7 +309,7 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathCAMap & pathsMap, S
auto deriver = readString(conn->from); auto deriver = readString(conn->from);
if (deriver != "") if (deriver != "")
info.deriver = parseStorePath(deriver); info.deriver = parseStorePath(deriver);
info.references = WorkerProto<StorePathSet>::read(*this, conn->from); info.references = WorkerProto::Serialise<StorePathSet>::read(*this, *conn);
info.downloadSize = readLongLong(conn->from); info.downloadSize = readLongLong(conn->from);
info.narSize = readLongLong(conn->from); info.narSize = readLongLong(conn->from);
} }
@ -322,7 +325,7 @@ void RemoteStore::queryPathInfoUncached(const StorePath & path,
std::shared_ptr<const ValidPathInfo> info; std::shared_ptr<const ValidPathInfo> info;
{ {
auto conn(getConnection()); auto conn(getConnection());
conn->to << wopQueryPathInfo << printStorePath(path); conn->to << WorkerProto::Op::QueryPathInfo << printStorePath(path);
try { try {
conn.processStderr(); conn.processStderr();
} catch (Error & e) { } catch (Error & e) {
@ -347,9 +350,9 @@ void RemoteStore::queryReferrers(const StorePath & path,
StorePathSet & referrers) StorePathSet & referrers)
{ {
auto conn(getConnection()); auto conn(getConnection());
conn->to << wopQueryReferrers << printStorePath(path); conn->to << WorkerProto::Op::QueryReferrers << printStorePath(path);
conn.processStderr(); conn.processStderr();
for (auto & i : WorkerProto<StorePathSet>::read(*this, conn->from)) for (auto & i : WorkerProto::Serialise<StorePathSet>::read(*this, *conn))
referrers.insert(i); referrers.insert(i);
} }
@ -357,9 +360,9 @@ void RemoteStore::queryReferrers(const StorePath & path,
StorePathSet RemoteStore::queryValidDerivers(const StorePath & path) StorePathSet RemoteStore::queryValidDerivers(const StorePath & path)
{ {
auto conn(getConnection()); auto conn(getConnection());
conn->to << wopQueryValidDerivers << printStorePath(path); conn->to << WorkerProto::Op::QueryValidDerivers << printStorePath(path);
conn.processStderr(); conn.processStderr();
return WorkerProto<StorePathSet>::read(*this, conn->from); return WorkerProto::Serialise<StorePathSet>::read(*this, *conn);
} }
@ -369,9 +372,9 @@ StorePathSet RemoteStore::queryDerivationOutputs(const StorePath & path)
return Store::queryDerivationOutputs(path); return Store::queryDerivationOutputs(path);
} }
auto conn(getConnection()); auto conn(getConnection());
conn->to << wopQueryDerivationOutputs << printStorePath(path); conn->to << WorkerProto::Op::QueryDerivationOutputs << printStorePath(path);
conn.processStderr(); conn.processStderr();
return WorkerProto<StorePathSet>::read(*this, conn->from); return WorkerProto::Serialise<StorePathSet>::read(*this, *conn);
} }
@ -379,9 +382,9 @@ std::map<std::string, std::optional<StorePath>> RemoteStore::queryPartialDerivat
{ {
if (GET_PROTOCOL_MINOR(getProtocol()) >= 0x16) { if (GET_PROTOCOL_MINOR(getProtocol()) >= 0x16) {
auto conn(getConnection()); auto conn(getConnection());
conn->to << wopQueryDerivationOutputMap << printStorePath(path); conn->to << WorkerProto::Op::QueryDerivationOutputMap << printStorePath(path);
conn.processStderr(); conn.processStderr();
return WorkerProto<std::map<std::string, std::optional<StorePath>>>::read(*this, conn->from); return WorkerProto::Serialise<std::map<std::string, std::optional<StorePath>>>::read(*this, *conn);
} else { } else {
// Fallback for old daemon versions. // Fallback for old daemon versions.
// For floating-CA derivations (and their co-dependencies) this is an // For floating-CA derivations (and their co-dependencies) this is an
@ -402,7 +405,7 @@ std::map<std::string, std::optional<StorePath>> RemoteStore::queryPartialDerivat
std::optional<StorePath> RemoteStore::queryPathFromHashPart(const std::string & hashPart) std::optional<StorePath> RemoteStore::queryPathFromHashPart(const std::string & hashPart)
{ {
auto conn(getConnection()); auto conn(getConnection());
conn->to << wopQueryPathFromHashPart << hashPart; conn->to << WorkerProto::Op::QueryPathFromHashPart << hashPart;
conn.processStderr(); conn.processStderr();
Path path = readString(conn->from); Path path = readString(conn->from);
if (path.empty()) return {}; if (path.empty()) return {};
@ -424,10 +427,10 @@ ref<const ValidPathInfo> RemoteStore::addCAToStore(
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 25) { if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 25) {
conn->to conn->to
<< wopAddToStore << WorkerProto::Op::AddToStore
<< name << name
<< caMethod.render(hashType); << caMethod.render(hashType);
workerProtoWrite(*this, conn->to, references); WorkerProto::write(*this, *conn, references);
conn->to << repair; conn->to << repair;
// The dump source may invoke the store, so we need to make some room. // The dump source may invoke the store, so we need to make some room.
@ -451,13 +454,13 @@ ref<const ValidPathInfo> RemoteStore::addCAToStore(
throw UnimplementedError("When adding text-hashed data called '%s', only SHA-256 is supported but '%s' was given", throw UnimplementedError("When adding text-hashed data called '%s', only SHA-256 is supported but '%s' was given",
name, printHashType(hashType)); name, printHashType(hashType));
std::string s = dump.drain(); std::string s = dump.drain();
conn->to << wopAddTextToStore << name << s; conn->to << WorkerProto::Op::AddTextToStore << name << s;
workerProtoWrite(*this, conn->to, references); WorkerProto::write(*this, *conn, references);
conn.processStderr(); conn.processStderr();
}, },
[&](const FileIngestionMethod & fim) -> void { [&](const FileIngestionMethod & fim) -> void {
conn->to conn->to
<< wopAddToStore << WorkerProto::Op::AddToStore
<< name << name
<< ((hashType == htSHA256 && fim == FileIngestionMethod::Recursive) ? 0 : 1) /* backwards compatibility hack */ << ((hashType == htSHA256 && fim == FileIngestionMethod::Recursive) ? 0 : 1) /* backwards compatibility hack */
<< (fim == FileIngestionMethod::Recursive ? 1 : 0) << (fim == FileIngestionMethod::Recursive ? 1 : 0)
@ -509,7 +512,7 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
auto conn(getConnection()); auto conn(getConnection());
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 18) { if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 18) {
conn->to << wopImportPaths; conn->to << WorkerProto::Op::ImportPaths;
auto source2 = sinkToSource([&](Sink & sink) { auto source2 = sinkToSource([&](Sink & sink) {
sink << 1 // == path follows sink << 1 // == path follows
@ -518,7 +521,7 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
sink sink
<< exportMagic << exportMagic
<< printStorePath(info.path); << printStorePath(info.path);
workerProtoWrite(*this, sink, info.references); WorkerProto::write(*this, *conn, info.references);
sink sink
<< (info.deriver ? printStorePath(*info.deriver) : "") << (info.deriver ? printStorePath(*info.deriver) : "")
<< 0 // == no legacy signature << 0 // == no legacy signature
@ -528,16 +531,16 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
conn.processStderr(0, source2.get()); conn.processStderr(0, source2.get());
auto importedPaths = WorkerProto<StorePathSet>::read(*this, conn->from); auto importedPaths = WorkerProto::Serialise<StorePathSet>::read(*this, *conn);
assert(importedPaths.size() <= 1); assert(importedPaths.size() <= 1);
} }
else { else {
conn->to << wopAddToStoreNar conn->to << WorkerProto::Op::AddToStoreNar
<< printStorePath(info.path) << printStorePath(info.path)
<< (info.deriver ? printStorePath(*info.deriver) : "") << (info.deriver ? printStorePath(*info.deriver) : "")
<< info.narHash.to_string(Base16, false); << info.narHash.to_string(Base16, false);
workerProtoWrite(*this, conn->to, info.references); WorkerProto::write(*this, *conn, info.references);
conn->to << info.registrationTime << info.narSize conn->to << info.registrationTime << info.narSize
<< info.ultimate << info.sigs << renderContentAddress(info.ca) << info.ultimate << info.sigs << renderContentAddress(info.ca)
<< repair << !checkSigs; << repair << !checkSigs;
@ -581,7 +584,7 @@ void RemoteStore::addMultipleToStore(
if (GET_PROTOCOL_MINOR(getConnection()->daemonVersion) >= 32) { if (GET_PROTOCOL_MINOR(getConnection()->daemonVersion) >= 32) {
auto conn(getConnection()); auto conn(getConnection());
conn->to conn->to
<< wopAddMultipleToStore << WorkerProto::Op::AddMultipleToStore
<< repair << repair
<< !checkSigs; << !checkSigs;
conn.withFramedSink([&](Sink & sink) { conn.withFramedSink([&](Sink & sink) {
@ -605,12 +608,12 @@ StorePath RemoteStore::addTextToStore(
void RemoteStore::registerDrvOutput(const Realisation & info) void RemoteStore::registerDrvOutput(const Realisation & info)
{ {
auto conn(getConnection()); auto conn(getConnection());
conn->to << wopRegisterDrvOutput; conn->to << WorkerProto::Op::RegisterDrvOutput;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 31) { if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 31) {
conn->to << info.id.to_string(); conn->to << info.id.to_string();
conn->to << std::string(info.outPath.to_string()); conn->to << std::string(info.outPath.to_string());
} else { } else {
workerProtoWrite(*this, conn->to, info); WorkerProto::write(*this, *conn, info);
} }
conn.processStderr(); conn.processStderr();
} }
@ -626,20 +629,20 @@ void RemoteStore::queryRealisationUncached(const DrvOutput & id,
return callback(nullptr); return callback(nullptr);
} }
conn->to << wopQueryRealisation; conn->to << WorkerProto::Op::QueryRealisation;
conn->to << id.to_string(); conn->to << id.to_string();
conn.processStderr(); conn.processStderr();
auto real = [&]() -> std::shared_ptr<const Realisation> { auto real = [&]() -> std::shared_ptr<const Realisation> {
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 31) { if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 31) {
auto outPaths = WorkerProto<std::set<StorePath>>::read( auto outPaths = WorkerProto::Serialise<std::set<StorePath>>::read(
*this, conn->from); *this, *conn);
if (outPaths.empty()) if (outPaths.empty())
return nullptr; return nullptr;
return std::make_shared<const Realisation>(Realisation { .id = id, .outPath = *outPaths.begin() }); return std::make_shared<const Realisation>(Realisation { .id = id, .outPath = *outPaths.begin() });
} else { } else {
auto realisations = WorkerProto<std::set<Realisation>>::read( auto realisations = WorkerProto::Serialise<std::set<Realisation>>::read(
*this, conn->from); *this, *conn);
if (realisations.empty()) if (realisations.empty())
return nullptr; return nullptr;
return std::make_shared<const Realisation>(*realisations.begin()); return std::make_shared<const Realisation>(*realisations.begin());
@ -650,10 +653,10 @@ void RemoteStore::queryRealisationUncached(const DrvOutput & id,
} catch (...) { return callback.rethrow(); } } catch (...) { return callback.rethrow(); }
} }
static void writeDerivedPaths(RemoteStore & store, ConnectionHandle & conn, const std::vector<DerivedPath> & reqs) static void writeDerivedPaths(RemoteStore & store, RemoteStore::Connection & conn, const std::vector<DerivedPath> & reqs)
{ {
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 30) { if (GET_PROTOCOL_MINOR(conn.daemonVersion) >= 30) {
workerProtoWrite(store, conn->to, reqs); WorkerProto::write(store, conn, reqs);
} else { } else {
Strings ss; Strings ss;
for (auto & p : reqs) { for (auto & p : reqs) {
@ -665,12 +668,12 @@ static void writeDerivedPaths(RemoteStore & store, ConnectionHandle & conn, cons
[&](const StorePath & drvPath) { [&](const StorePath & drvPath) {
throw Error("trying to request '%s', but daemon protocol %d.%d is too old (< 1.29) to request a derivation file", throw Error("trying to request '%s', but daemon protocol %d.%d is too old (< 1.29) to request a derivation file",
store.printStorePath(drvPath), store.printStorePath(drvPath),
GET_PROTOCOL_MAJOR(conn->daemonVersion), GET_PROTOCOL_MAJOR(conn.daemonVersion),
GET_PROTOCOL_MINOR(conn->daemonVersion)); GET_PROTOCOL_MINOR(conn.daemonVersion));
}, },
}, sOrDrvPath); }, sOrDrvPath);
} }
conn->to << ss; conn.to << ss;
} }
} }
@ -694,9 +697,9 @@ void RemoteStore::buildPaths(const std::vector<DerivedPath> & drvPaths, BuildMod
copyDrvsFromEvalStore(drvPaths, evalStore); copyDrvsFromEvalStore(drvPaths, evalStore);
auto conn(getConnection()); auto conn(getConnection());
conn->to << wopBuildPaths; conn->to << WorkerProto::Op::BuildPaths;
assert(GET_PROTOCOL_MINOR(conn->daemonVersion) >= 13); assert(GET_PROTOCOL_MINOR(conn->daemonVersion) >= 13);
writeDerivedPaths(*this, conn, drvPaths); writeDerivedPaths(*this, *conn, drvPaths);
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 15) if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 15)
conn->to << buildMode; conn->to << buildMode;
else else
@ -719,11 +722,11 @@ std::vector<KeyedBuildResult> RemoteStore::buildPathsWithResults(
auto & conn = *conn_; auto & conn = *conn_;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 34) { if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 34) {
conn->to << wopBuildPathsWithResults; conn->to << WorkerProto::Op::BuildPathsWithResults;
writeDerivedPaths(*this, conn, paths); writeDerivedPaths(*this, *conn, paths);
conn->to << buildMode; conn->to << buildMode;
conn.processStderr(); conn.processStderr();
return WorkerProto<std::vector<KeyedBuildResult>>::read(*this, conn->from); return WorkerProto::Serialise<std::vector<KeyedBuildResult>>::read(*this, *conn);
} else { } else {
// Avoid deadlock. // Avoid deadlock.
conn_.reset(); conn_.reset();
@ -795,7 +798,7 @@ BuildResult RemoteStore::buildDerivation(const StorePath & drvPath, const BasicD
BuildMode buildMode) BuildMode buildMode)
{ {
auto conn(getConnection()); auto conn(getConnection());
conn->to << wopBuildDerivation << printStorePath(drvPath); conn->to << WorkerProto::Op::BuildDerivation << printStorePath(drvPath);
writeDerivation(conn->to, *this, drv); writeDerivation(conn->to, *this, drv);
conn->to << buildMode; conn->to << buildMode;
conn.processStderr(); conn.processStderr();
@ -806,7 +809,7 @@ BuildResult RemoteStore::buildDerivation(const StorePath & drvPath, const BasicD
conn->from >> res.timesBuilt >> res.isNonDeterministic >> res.startTime >> res.stopTime; conn->from >> res.timesBuilt >> res.isNonDeterministic >> res.startTime >> res.stopTime;
} }
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 28) { if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 28) {
auto builtOutputs = WorkerProto<DrvOutputs>::read(*this, conn->from); auto builtOutputs = WorkerProto::Serialise<DrvOutputs>::read(*this, *conn);
for (auto && [output, realisation] : builtOutputs) for (auto && [output, realisation] : builtOutputs)
res.builtOutputs.insert_or_assign( res.builtOutputs.insert_or_assign(
std::move(output.outputName), std::move(output.outputName),
@ -819,7 +822,7 @@ BuildResult RemoteStore::buildDerivation(const StorePath & drvPath, const BasicD
void RemoteStore::ensurePath(const StorePath & path) void RemoteStore::ensurePath(const StorePath & path)
{ {
auto conn(getConnection()); auto conn(getConnection());
conn->to << wopEnsurePath << printStorePath(path); conn->to << WorkerProto::Op::EnsurePath << printStorePath(path);
conn.processStderr(); conn.processStderr();
readInt(conn->from); readInt(conn->from);
} }
@ -828,7 +831,7 @@ void RemoteStore::ensurePath(const StorePath & path)
void RemoteStore::addTempRoot(const StorePath & path) void RemoteStore::addTempRoot(const StorePath & path)
{ {
auto conn(getConnection()); auto conn(getConnection());
conn->to << wopAddTempRoot << printStorePath(path); conn->to << WorkerProto::Op::AddTempRoot << printStorePath(path);
conn.processStderr(); conn.processStderr();
readInt(conn->from); readInt(conn->from);
} }
@ -837,7 +840,7 @@ void RemoteStore::addTempRoot(const StorePath & path)
void RemoteStore::addIndirectRoot(const Path & path) void RemoteStore::addIndirectRoot(const Path & path)
{ {
auto conn(getConnection()); auto conn(getConnection());
conn->to << wopAddIndirectRoot << path; conn->to << WorkerProto::Op::AddIndirectRoot << path;
conn.processStderr(); conn.processStderr();
readInt(conn->from); readInt(conn->from);
} }
@ -846,7 +849,7 @@ void RemoteStore::addIndirectRoot(const Path & path)
Roots RemoteStore::findRoots(bool censor) Roots RemoteStore::findRoots(bool censor)
{ {
auto conn(getConnection()); auto conn(getConnection());
conn->to << wopFindRoots; conn->to << WorkerProto::Op::FindRoots;
conn.processStderr(); conn.processStderr();
size_t count = readNum<size_t>(conn->from); size_t count = readNum<size_t>(conn->from);
Roots result; Roots result;
@ -864,8 +867,8 @@ void RemoteStore::collectGarbage(const GCOptions & options, GCResults & results)
auto conn(getConnection()); auto conn(getConnection());
conn->to conn->to
<< wopCollectGarbage << options.action; << WorkerProto::Op::CollectGarbage << options.action;
workerProtoWrite(*this, conn->to, options.pathsToDelete); WorkerProto::write(*this, *conn, options.pathsToDelete);
conn->to << options.ignoreLiveness conn->to << options.ignoreLiveness
<< options.maxFreed << options.maxFreed
/* removed options */ /* removed options */
@ -887,7 +890,7 @@ void RemoteStore::collectGarbage(const GCOptions & options, GCResults & results)
void RemoteStore::optimiseStore() void RemoteStore::optimiseStore()
{ {
auto conn(getConnection()); auto conn(getConnection());
conn->to << wopOptimiseStore; conn->to << WorkerProto::Op::OptimiseStore;
conn.processStderr(); conn.processStderr();
readInt(conn->from); readInt(conn->from);
} }
@ -896,7 +899,7 @@ void RemoteStore::optimiseStore()
bool RemoteStore::verifyStore(bool checkContents, RepairFlag repair) bool RemoteStore::verifyStore(bool checkContents, RepairFlag repair)
{ {
auto conn(getConnection()); auto conn(getConnection());
conn->to << wopVerifyStore << checkContents << repair; conn->to << WorkerProto::Op::VerifyStore << checkContents << repair;
conn.processStderr(); conn.processStderr();
return readInt(conn->from); return readInt(conn->from);
} }
@ -905,7 +908,7 @@ bool RemoteStore::verifyStore(bool checkContents, RepairFlag repair)
void RemoteStore::addSignatures(const StorePath & storePath, const StringSet & sigs) void RemoteStore::addSignatures(const StorePath & storePath, const StringSet & sigs)
{ {
auto conn(getConnection()); auto conn(getConnection());
conn->to << wopAddSignatures << printStorePath(storePath) << sigs; conn->to << WorkerProto::Op::AddSignatures << printStorePath(storePath) << sigs;
conn.processStderr(); conn.processStderr();
readInt(conn->from); readInt(conn->from);
} }
@ -921,12 +924,12 @@ void RemoteStore::queryMissing(const std::vector<DerivedPath> & targets,
// Don't hold the connection handle in the fallback case // Don't hold the connection handle in the fallback case
// to prevent a deadlock. // to prevent a deadlock.
goto fallback; goto fallback;
conn->to << wopQueryMissing; conn->to << WorkerProto::Op::QueryMissing;
writeDerivedPaths(*this, conn, targets); writeDerivedPaths(*this, *conn, targets);
conn.processStderr(); conn.processStderr();
willBuild = WorkerProto<StorePathSet>::read(*this, conn->from); willBuild = WorkerProto::Serialise<StorePathSet>::read(*this, *conn);
willSubstitute = WorkerProto<StorePathSet>::read(*this, conn->from); willSubstitute = WorkerProto::Serialise<StorePathSet>::read(*this, *conn);
unknown = WorkerProto<StorePathSet>::read(*this, conn->from); unknown = WorkerProto::Serialise<StorePathSet>::read(*this, *conn);
conn->from >> downloadSize >> narSize; conn->from >> downloadSize >> narSize;
return; return;
} }
@ -940,7 +943,7 @@ void RemoteStore::queryMissing(const std::vector<DerivedPath> & targets,
void RemoteStore::addBuildLog(const StorePath & drvPath, std::string_view log) void RemoteStore::addBuildLog(const StorePath & drvPath, std::string_view log)
{ {
auto conn(getConnection()); auto conn(getConnection());
conn->to << wopAddBuildLog << drvPath.to_string(); conn->to << WorkerProto::Op::AddBuildLog << drvPath.to_string();
StringSource source(log); StringSource source(log);
conn.withFramedSink([&](Sink & sink) { conn.withFramedSink([&](Sink & sink) {
source.drainInto(sink); source.drainInto(sink);
@ -992,7 +995,7 @@ RemoteStore::Connection::~Connection()
void RemoteStore::narFromPath(const StorePath & path, Sink & sink) void RemoteStore::narFromPath(const StorePath & path, Sink & sink)
{ {
auto conn(connections->get()); auto conn(connections->get());
conn->to << wopNarFromPath << printStorePath(path); conn->to << WorkerProto::Op::NarFromPath << printStorePath(path);
conn->processStderr(); conn->processStderr();
copyNAR(conn->from, sink); copyNAR(conn->from, sink);
} }

View file

@ -166,21 +166,7 @@ public:
void flushBadConnections(); void flushBadConnections();
struct Connection struct Connection;
{
FdSink to;
FdSource from;
unsigned int daemonVersion;
std::optional<TrustedFlag> remoteTrustsUs;
std::optional<std::string> daemonNixVersion;
std::chrono::time_point<std::chrono::steady_clock> startTime;
virtual ~Connection();
virtual void closeWrite() = 0;
std::exception_ptr processStderr(Sink * sink = 0, Source * source = 0, bool flush = true);
};
ref<Connection> openConnectionWrapper(); ref<Connection> openConnectionWrapper();

View file

@ -10,16 +10,52 @@ namespace nix {
#define GET_PROTOCOL_MAJOR(x) ((x) & 0xff00) #define GET_PROTOCOL_MAJOR(x) ((x) & 0xff00)
#define GET_PROTOCOL_MINOR(x) ((x) & 0x00ff) #define GET_PROTOCOL_MINOR(x) ((x) & 0x00ff)
typedef enum { /**
cmdQueryValidPaths = 1, * The "serve protocol", used by ssh:// stores.
cmdQueryPathInfos = 2, *
cmdDumpStorePath = 3, * This `struct` is basically just a `namespace`; We use a type rather
cmdImportPaths = 4, * than a namespace just so we can use it as a template argument.
cmdExportPaths = 5, */
cmdBuildPaths = 6, struct ServeProto
cmdQueryClosure = 7, {
cmdBuildDerivation = 8, /**
cmdAddToStoreNar = 9, * Enumeration of all the request types for the protocol.
} ServeCommand; */
enum struct Command : uint64_t;
};
enum struct ServeProto::Command : uint64_t
{
QueryValidPaths = 1,
QueryPathInfos = 2,
DumpStorePath = 3,
ImportPaths = 4,
ExportPaths = 5,
BuildPaths = 6,
QueryClosure = 7,
BuildDerivation = 8,
AddToStoreNar = 9,
};
/**
* Convenience for sending operation codes.
*
* @todo Switch to using `ServeProto::Serialize` instead probably. But
* this was not done at this time so there would be less churn.
*/
inline Sink & operator << (Sink & sink, ServeProto::Command op)
{
return sink << (uint64_t) op;
}
/**
* Convenience for debugging.
*
* @todo Perhaps render known opcodes more nicely.
*/
inline std::ostream & operator << (std::ostream & s, ServeProto::Command op)
{
return s << (uint64_t) op;
}
} }

View file

@ -1,6 +1,7 @@
#include "ssh-store-config.hh" #include "ssh-store-config.hh"
#include "store-api.hh" #include "store-api.hh"
#include "remote-store.hh" #include "remote-store.hh"
#include "remote-store-connection.hh"
#include "remote-fs-accessor.hh" #include "remote-fs-accessor.hh"
#include "archive.hh" #include "archive.hh"
#include "worker-protocol.hh" #include "worker-protocol.hh"

View file

@ -2,6 +2,7 @@
///@file ///@file
#include "remote-store.hh" #include "remote-store.hh"
#include "remote-store-connection.hh"
#include "local-fs-store.hh" #include "local-fs-store.hh"
namespace nix { namespace nix {

View file

@ -0,0 +1,78 @@
#pragma once
/**
* @file
*
* Template implementations (as opposed to mere declarations).
*
* This file is an exmample of the "impl.hh" pattern. See the
* contributing guide.
*/
#include "worker-protocol.hh"
namespace nix {
template<typename T>
std::vector<T> WorkerProto::Serialise<std::vector<T>>::read(const Store & store, WorkerProto::ReadConn conn)
{
std::vector<T> resSet;
auto size = readNum<size_t>(conn.from);
while (size--) {
resSet.push_back(WorkerProto::Serialise<T>::read(store, conn));
}
return resSet;
}
template<typename T>
void WorkerProto::Serialise<std::vector<T>>::write(const Store & store, WorkerProto::WriteConn conn, const std::vector<T> & resSet)
{
conn.to << resSet.size();
for (auto & key : resSet) {
WorkerProto::Serialise<T>::write(store, conn, key);
}
}
template<typename T>
std::set<T> WorkerProto::Serialise<std::set<T>>::read(const Store & store, WorkerProto::ReadConn conn)
{
std::set<T> resSet;
auto size = readNum<size_t>(conn.from);
while (size--) {
resSet.insert(WorkerProto::Serialise<T>::read(store, conn));
}
return resSet;
}
template<typename T>
void WorkerProto::Serialise<std::set<T>>::write(const Store & store, WorkerProto::WriteConn conn, const std::set<T> & resSet)
{
conn.to << resSet.size();
for (auto & key : resSet) {
WorkerProto::Serialise<T>::write(store, conn, key);
}
}
template<typename K, typename V>
std::map<K, V> WorkerProto::Serialise<std::map<K, V>>::read(const Store & store, WorkerProto::ReadConn conn)
{
std::map<K, V> resMap;
auto size = readNum<size_t>(conn.from);
while (size--) {
auto k = WorkerProto::Serialise<K>::read(store, conn);
auto v = WorkerProto::Serialise<V>::read(store, conn);
resMap.insert_or_assign(std::move(k), std::move(v));
}
return resMap;
}
template<typename K, typename V>
void WorkerProto::Serialise<std::map<K, V>>::write(const Store & store, WorkerProto::WriteConn conn, const std::map<K, V> & resMap)
{
conn.to << resMap.size();
for (auto & i : resMap) {
WorkerProto::Serialise<K>::write(store, conn, i.first);
WorkerProto::Serialise<V>::write(store, conn, i.second);
}
}
}

View file

@ -4,6 +4,7 @@
#include "store-api.hh" #include "store-api.hh"
#include "build-result.hh" #include "build-result.hh"
#include "worker-protocol.hh" #include "worker-protocol.hh"
#include "worker-protocol-impl.hh"
#include "archive.hh" #include "archive.hh"
#include "derivations.hh" #include "derivations.hh"
@ -11,31 +12,31 @@
namespace nix { namespace nix {
std::string WorkerProto<std::string>::read(const Store & store, Source & from) std::string WorkerProto::Serialise<std::string>::read(const Store & store, WorkerProto::ReadConn conn)
{ {
return readString(from); return readString(conn.from);
} }
void WorkerProto<std::string>::write(const Store & store, Sink & out, const std::string & str) void WorkerProto::Serialise<std::string>::write(const Store & store, WorkerProto::WriteConn conn, const std::string & str)
{ {
out << str; conn.to << str;
} }
StorePath WorkerProto<StorePath>::read(const Store & store, Source & from) StorePath WorkerProto::Serialise<StorePath>::read(const Store & store, WorkerProto::ReadConn conn)
{ {
return store.parseStorePath(readString(from)); return store.parseStorePath(readString(conn.from));
} }
void WorkerProto<StorePath>::write(const Store & store, Sink & out, const StorePath & storePath) void WorkerProto::Serialise<StorePath>::write(const Store & store, WorkerProto::WriteConn conn, const StorePath & storePath)
{ {
out << store.printStorePath(storePath); conn.to << store.printStorePath(storePath);
} }
std::optional<TrustedFlag> WorkerProto<std::optional<TrustedFlag>>::read(const Store & store, Source & from) std::optional<TrustedFlag> WorkerProto::Serialise<std::optional<TrustedFlag>>::read(const Store & store, WorkerProto::ReadConn conn)
{ {
auto temp = readNum<uint8_t>(from); auto temp = readNum<uint8_t>(conn.from);
switch (temp) { switch (temp) {
case 0: case 0:
return std::nullopt; return std::nullopt;
@ -48,17 +49,17 @@ std::optional<TrustedFlag> WorkerProto<std::optional<TrustedFlag>>::read(const S
} }
} }
void WorkerProto<std::optional<TrustedFlag>>::write(const Store & store, Sink & out, const std::optional<TrustedFlag> & optTrusted) void WorkerProto::Serialise<std::optional<TrustedFlag>>::write(const Store & store, WorkerProto::WriteConn conn, const std::optional<TrustedFlag> & optTrusted)
{ {
if (!optTrusted) if (!optTrusted)
out << (uint8_t)0; conn.to << (uint8_t)0;
else { else {
switch (*optTrusted) { switch (*optTrusted) {
case Trusted: case Trusted:
out << (uint8_t)1; conn.to << (uint8_t)1;
break; break;
case NotTrusted: case NotTrusted:
out << (uint8_t)2; conn.to << (uint8_t)2;
break; break;
default: default:
assert(false); assert(false);
@ -67,83 +68,83 @@ void WorkerProto<std::optional<TrustedFlag>>::write(const Store & store, Sink &
} }
ContentAddress WorkerProto<ContentAddress>::read(const Store & store, Source & from) ContentAddress WorkerProto::Serialise<ContentAddress>::read(const Store & store, WorkerProto::ReadConn conn)
{ {
return ContentAddress::parse(readString(from)); return ContentAddress::parse(readString(conn.from));
} }
void WorkerProto<ContentAddress>::write(const Store & store, Sink & out, const ContentAddress & ca) void WorkerProto::Serialise<ContentAddress>::write(const Store & store, WorkerProto::WriteConn conn, const ContentAddress & ca)
{ {
out << renderContentAddress(ca); conn.to << renderContentAddress(ca);
} }
DerivedPath WorkerProto<DerivedPath>::read(const Store & store, Source & from) DerivedPath WorkerProto::Serialise<DerivedPath>::read(const Store & store, WorkerProto::ReadConn conn)
{ {
auto s = readString(from); auto s = readString(conn.from);
return DerivedPath::parseLegacy(store, s); return DerivedPath::parseLegacy(store, s);
} }
void WorkerProto<DerivedPath>::write(const Store & store, Sink & out, const DerivedPath & req) void WorkerProto::Serialise<DerivedPath>::write(const Store & store, WorkerProto::WriteConn conn, const DerivedPath & req)
{ {
out << req.to_string_legacy(store); conn.to << req.to_string_legacy(store);
} }
Realisation WorkerProto<Realisation>::read(const Store & store, Source & from) Realisation WorkerProto::Serialise<Realisation>::read(const Store & store, WorkerProto::ReadConn conn)
{ {
std::string rawInput = readString(from); std::string rawInput = readString(conn.from);
return Realisation::fromJSON( return Realisation::fromJSON(
nlohmann::json::parse(rawInput), nlohmann::json::parse(rawInput),
"remote-protocol" "remote-protocol"
); );
} }
void WorkerProto<Realisation>::write(const Store & store, Sink & out, const Realisation & realisation) void WorkerProto::Serialise<Realisation>::write(const Store & store, WorkerProto::WriteConn conn, const Realisation & realisation)
{ {
out << realisation.toJSON().dump(); conn.to << realisation.toJSON().dump();
} }
DrvOutput WorkerProto<DrvOutput>::read(const Store & store, Source & from) DrvOutput WorkerProto::Serialise<DrvOutput>::read(const Store & store, WorkerProto::ReadConn conn)
{ {
return DrvOutput::parse(readString(from)); return DrvOutput::parse(readString(conn.from));
} }
void WorkerProto<DrvOutput>::write(const Store & store, Sink & out, const DrvOutput & drvOutput) void WorkerProto::Serialise<DrvOutput>::write(const Store & store, WorkerProto::WriteConn conn, const DrvOutput & drvOutput)
{ {
out << drvOutput.to_string(); conn.to << drvOutput.to_string();
} }
KeyedBuildResult WorkerProto<KeyedBuildResult>::read(const Store & store, Source & from) KeyedBuildResult WorkerProto::Serialise<KeyedBuildResult>::read(const Store & store, WorkerProto::ReadConn conn)
{ {
auto path = WorkerProto<DerivedPath>::read(store, from); auto path = WorkerProto::Serialise<DerivedPath>::read(store, conn);
auto br = WorkerProto<BuildResult>::read(store, from); auto br = WorkerProto::Serialise<BuildResult>::read(store, conn);
return KeyedBuildResult { return KeyedBuildResult {
std::move(br), std::move(br),
/* .path = */ std::move(path), /* .path = */ std::move(path),
}; };
} }
void WorkerProto<KeyedBuildResult>::write(const Store & store, Sink & to, const KeyedBuildResult & res) void WorkerProto::Serialise<KeyedBuildResult>::write(const Store & store, WorkerProto::WriteConn conn, const KeyedBuildResult & res)
{ {
workerProtoWrite(store, to, res.path); WorkerProto::write(store, conn, res.path);
workerProtoWrite(store, to, static_cast<const BuildResult &>(res)); WorkerProto::write(store, conn, static_cast<const BuildResult &>(res));
} }
BuildResult WorkerProto<BuildResult>::read(const Store & store, Source & from) BuildResult WorkerProto::Serialise<BuildResult>::read(const Store & store, WorkerProto::ReadConn conn)
{ {
BuildResult res; BuildResult res;
res.status = (BuildResult::Status) readInt(from); res.status = (BuildResult::Status) readInt(conn.from);
from conn.from
>> res.errorMsg >> res.errorMsg
>> res.timesBuilt >> res.timesBuilt
>> res.isNonDeterministic >> res.isNonDeterministic
>> res.startTime >> res.startTime
>> res.stopTime; >> res.stopTime;
auto builtOutputs = WorkerProto<DrvOutputs>::read(store, from); auto builtOutputs = WorkerProto::Serialise<DrvOutputs>::read(store, conn);
for (auto && [output, realisation] : builtOutputs) for (auto && [output, realisation] : builtOutputs)
res.builtOutputs.insert_or_assign( res.builtOutputs.insert_or_assign(
std::move(output.outputName), std::move(output.outputName),
@ -151,9 +152,9 @@ BuildResult WorkerProto<BuildResult>::read(const Store & store, Source & from)
return res; return res;
} }
void WorkerProto<BuildResult>::write(const Store & store, Sink & to, const BuildResult & res) void WorkerProto::Serialise<BuildResult>::write(const Store & store, WorkerProto::WriteConn conn, const BuildResult & res)
{ {
to conn.to
<< res.status << res.status
<< res.errorMsg << res.errorMsg
<< res.timesBuilt << res.timesBuilt
@ -163,30 +164,30 @@ void WorkerProto<BuildResult>::write(const Store & store, Sink & to, const Build
DrvOutputs builtOutputs; DrvOutputs builtOutputs;
for (auto & [output, realisation] : res.builtOutputs) for (auto & [output, realisation] : res.builtOutputs)
builtOutputs.insert_or_assign(realisation.id, realisation); builtOutputs.insert_or_assign(realisation.id, realisation);
workerProtoWrite(store, to, builtOutputs); WorkerProto::write(store, conn, builtOutputs);
} }
std::optional<StorePath> WorkerProto<std::optional<StorePath>>::read(const Store & store, Source & from) std::optional<StorePath> WorkerProto::Serialise<std::optional<StorePath>>::read(const Store & store, WorkerProto::ReadConn conn)
{ {
auto s = readString(from); auto s = readString(conn.from);
return s == "" ? std::optional<StorePath> {} : store.parseStorePath(s); return s == "" ? std::optional<StorePath> {} : store.parseStorePath(s);
} }
void WorkerProto<std::optional<StorePath>>::write(const Store & store, Sink & out, const std::optional<StorePath> & storePathOpt) void WorkerProto::Serialise<std::optional<StorePath>>::write(const Store & store, WorkerProto::WriteConn conn, const std::optional<StorePath> & storePathOpt)
{ {
out << (storePathOpt ? store.printStorePath(*storePathOpt) : ""); conn.to << (storePathOpt ? store.printStorePath(*storePathOpt) : "");
} }
std::optional<ContentAddress> WorkerProto<std::optional<ContentAddress>>::read(const Store & store, Source & from) std::optional<ContentAddress> WorkerProto::Serialise<std::optional<ContentAddress>>::read(const Store & store, WorkerProto::ReadConn conn)
{ {
return ContentAddress::parseOpt(readString(from)); return ContentAddress::parseOpt(readString(conn.from));
} }
void WorkerProto<std::optional<ContentAddress>>::write(const Store & store, Sink & out, const std::optional<ContentAddress> & caOpt) void WorkerProto::Serialise<std::optional<ContentAddress>>::write(const Store & store, WorkerProto::WriteConn conn, const std::optional<ContentAddress> & caOpt)
{ {
out << (caOpt ? renderContentAddress(*caOpt) : ""); conn.to << (caOpt ? renderContentAddress(*caOpt) : "");
} }
} }

View file

@ -14,57 +14,6 @@ namespace nix {
#define GET_PROTOCOL_MINOR(x) ((x) & 0x00ff) #define GET_PROTOCOL_MINOR(x) ((x) & 0x00ff)
/**
* Enumeration of all the request types for the "worker protocol", used
* by unix:// and ssh-ng:// stores.
*/
typedef enum {
wopIsValidPath = 1,
wopHasSubstitutes = 3,
wopQueryPathHash = 4, // obsolete
wopQueryReferences = 5, // obsolete
wopQueryReferrers = 6,
wopAddToStore = 7,
wopAddTextToStore = 8, // obsolete since 1.25, Nix 3.0. Use wopAddToStore
wopBuildPaths = 9,
wopEnsurePath = 10,
wopAddTempRoot = 11,
wopAddIndirectRoot = 12,
wopSyncWithGC = 13,
wopFindRoots = 14,
wopExportPath = 16, // obsolete
wopQueryDeriver = 18, // obsolete
wopSetOptions = 19,
wopCollectGarbage = 20,
wopQuerySubstitutablePathInfo = 21,
wopQueryDerivationOutputs = 22, // obsolete
wopQueryAllValidPaths = 23,
wopQueryFailedPaths = 24,
wopClearFailedPaths = 25,
wopQueryPathInfo = 26,
wopImportPaths = 27, // obsolete
wopQueryDerivationOutputNames = 28, // obsolete
wopQueryPathFromHashPart = 29,
wopQuerySubstitutablePathInfos = 30,
wopQueryValidPaths = 31,
wopQuerySubstitutablePaths = 32,
wopQueryValidDerivers = 33,
wopOptimiseStore = 34,
wopVerifyStore = 35,
wopBuildDerivation = 36,
wopAddSignatures = 37,
wopNarFromPath = 38,
wopAddToStoreNar = 39,
wopQueryMissing = 40,
wopQueryDerivationOutputMap = 41,
wopRegisterDrvOutput = 42,
wopQueryRealisation = 43,
wopAddMultipleToStore = 44,
wopAddBuildLog = 45,
wopBuildPathsWithResults = 46,
} WorkerOp;
#define STDERR_NEXT 0x6f6c6d67 #define STDERR_NEXT 0x6f6c6d67
#define STDERR_READ 0x64617461 // data needed from source #define STDERR_READ 0x64617461 // data needed from source
#define STDERR_WRITE 0x64617416 // data for sink #define STDERR_WRITE 0x64617416 // data for sink
@ -78,7 +27,7 @@ typedef enum {
class Store; class Store;
struct Source; struct Source;
// items being serialized // items being serialised
struct DerivedPath; struct DerivedPath;
struct DrvOutput; struct DrvOutput;
struct Realisation; struct Realisation;
@ -88,31 +37,154 @@ enum TrustedFlag : bool;
/** /**
* Data type for canonical pairs of serializers for the worker protocol. * The "worker protocol", used by unix:// and ssh-ng:// stores.
*
* This `struct` is basically just a `namespace`; We use a type rather
* than a namespace just so we can use it as a template argument.
*/
struct WorkerProto
{
/**
* Enumeration of all the request types for the protocol.
*/
enum struct Op : uint64_t;
/**
* A unidirectional read connection, to be used by the read half of the
* canonical serializers below.
*
* This currently is just a `Source &`, but more fields will be added
* later.
*/
struct ReadConn {
Source & from;
};
/**
* A unidirectional write connection, to be used by the write half of the
* canonical serializers below.
*
* This currently is just a `Sink &`, but more fields will be added
* later.
*/
struct WriteConn {
Sink & to;
};
/**
* Data type for canonical pairs of serialisers for the worker protocol.
* *
* See https://en.cppreference.com/w/cpp/language/adl for the broader * See https://en.cppreference.com/w/cpp/language/adl for the broader
* concept of what is going on here. * concept of what is going on here.
*/ */
template<typename T> template<typename T>
struct WorkerProto { struct Serialise;
static T read(const Store & store, Source & from); // This is the definition of `Serialise` we *want* to put here, but
static void write(const Store & store, Sink & out, const T & t); // do not do so.
//
// The problem is that if we do so, C++ will think we have
// seralisers for *all* types. We don't, of course, but that won't
// cause an error until link time. That makes for long debug cycles
// when there is a missing serialiser.
//
// By not defining it globally, and instead letting individual
// serialisers specialise the type, we get back the compile-time
// errors we would like. When no serialiser exists, C++ sees an
// abstract "incomplete" type with no definition, and any attempt to
// use `to` or `from` static methods is a compile-time error because
// they don't exist on an incomplete type.
//
// This makes for a quicker debug cycle, as desired.
#if 0
{
static T read(const Store & store, ReadConn conn);
static void write(const Store & store, WriteConn conn, const T & t);
};
#endif
/**
* Wrapper function around `WorkerProto::Serialise<T>::write` that allows us to
* infer the type instead of having to write it down explicitly.
*/
template<typename T>
static void write(const Store & store, WriteConn conn, const T & t)
{
WorkerProto::Serialise<T>::write(store, conn, t);
}
};
enum struct WorkerProto::Op : uint64_t
{
IsValidPath = 1,
HasSubstitutes = 3,
QueryPathHash = 4, // obsolete
QueryReferences = 5, // obsolete
QueryReferrers = 6,
AddToStore = 7,
AddTextToStore = 8, // obsolete since 1.25, Nix 3.0. Use WorkerProto::Op::AddToStore
BuildPaths = 9,
EnsurePath = 10,
AddTempRoot = 11,
AddIndirectRoot = 12,
SyncWithGC = 13,
FindRoots = 14,
ExportPath = 16, // obsolete
QueryDeriver = 18, // obsolete
SetOptions = 19,
CollectGarbage = 20,
QuerySubstitutablePathInfo = 21,
QueryDerivationOutputs = 22, // obsolete
QueryAllValidPaths = 23,
QueryFailedPaths = 24,
ClearFailedPaths = 25,
QueryPathInfo = 26,
ImportPaths = 27, // obsolete
QueryDerivationOutputNames = 28, // obsolete
QueryPathFromHashPart = 29,
QuerySubstitutablePathInfos = 30,
QueryValidPaths = 31,
QuerySubstitutablePaths = 32,
QueryValidDerivers = 33,
OptimiseStore = 34,
VerifyStore = 35,
BuildDerivation = 36,
AddSignatures = 37,
NarFromPath = 38,
AddToStoreNar = 39,
QueryMissing = 40,
QueryDerivationOutputMap = 41,
RegisterDrvOutput = 42,
QueryRealisation = 43,
AddMultipleToStore = 44,
AddBuildLog = 45,
BuildPathsWithResults = 46,
}; };
/** /**
* Wrapper function around `WorkerProto<T>::write` that allows us to * Convenience for sending operation codes.
* infer the type instead of having to write it down explicitly. *
* @todo Switch to using `WorkerProto::Serialise` instead probably. But
* this was not done at this time so there would be less churn.
*/ */
template<typename T> inline Sink & operator << (Sink & sink, WorkerProto::Op op)
void workerProtoWrite(const Store & store, Sink & out, const T & t)
{ {
WorkerProto<T>::write(store, out, t); return sink << (uint64_t) op;
} }
/** /**
* Declare a canonical serializer pair for the worker protocol. * Convenience for debugging.
* *
* We specialize the struct merely to indicate that we are implementing * @todo Perhaps render known opcodes more nicely.
*/
inline std::ostream & operator << (std::ostream & s, WorkerProto::Op op)
{
return s << (uint64_t) op;
}
/**
* Declare a canonical serialiser pair for the worker protocol.
*
* We specialise the struct merely to indicate that we are implementing
* the function for the given type. * the function for the given type.
* *
* Some sort of `template<...>` must be used with the caller for this to * Some sort of `template<...>` must be used with the caller for this to
@ -120,9 +192,9 @@ void workerProtoWrite(const Store & store, Sink & out, const T & t)
* practice. * practice.
*/ */
#define MAKE_WORKER_PROTO(T) \ #define MAKE_WORKER_PROTO(T) \
struct WorkerProto< T > { \ struct WorkerProto::Serialise< T > { \
static T read(const Store & store, Source & from); \ static T read(const Store & store, WorkerProto::ReadConn conn); \
static void write(const Store & store, Sink & out, const T & t); \ static void write(const Store & store, WorkerProto::WriteConn conn, const T & t); \
}; };
template<> template<>
@ -156,7 +228,7 @@ MAKE_WORKER_PROTO(X_);
/** /**
* These use the empty string for the null case, relying on the fact * These use the empty string for the null case, relying on the fact
* that the underlying types never serialize to the empty string. * that the underlying types never serialise to the empty string.
* *
* We do this instead of a generic std::optional<T> instance because * We do this instead of a generic std::optional<T> instance because
* ordinal tags (0 or 1, here) are a bit of a compatability hazard. For * ordinal tags (0 or 1, here) are a bit of a compatability hazard. For
@ -173,67 +245,4 @@ MAKE_WORKER_PROTO(std::optional<StorePath>);
template<> template<>
MAKE_WORKER_PROTO(std::optional<ContentAddress>); MAKE_WORKER_PROTO(std::optional<ContentAddress>);
template<typename T>
std::vector<T> WorkerProto<std::vector<T>>::read(const Store & store, Source & from)
{
std::vector<T> resSet;
auto size = readNum<size_t>(from);
while (size--) {
resSet.push_back(WorkerProto<T>::read(store, from));
}
return resSet;
}
template<typename T>
void WorkerProto<std::vector<T>>::write(const Store & store, Sink & out, const std::vector<T> & resSet)
{
out << resSet.size();
for (auto & key : resSet) {
WorkerProto<T>::write(store, out, key);
}
}
template<typename T>
std::set<T> WorkerProto<std::set<T>>::read(const Store & store, Source & from)
{
std::set<T> resSet;
auto size = readNum<size_t>(from);
while (size--) {
resSet.insert(WorkerProto<T>::read(store, from));
}
return resSet;
}
template<typename T>
void WorkerProto<std::set<T>>::write(const Store & store, Sink & out, const std::set<T> & resSet)
{
out << resSet.size();
for (auto & key : resSet) {
WorkerProto<T>::write(store, out, key);
}
}
template<typename K, typename V>
std::map<K, V> WorkerProto<std::map<K, V>>::read(const Store & store, Source & from)
{
std::map<K, V> resMap;
auto size = readNum<size_t>(from);
while (size--) {
auto k = WorkerProto<K>::read(store, from);
auto v = WorkerProto<V>::read(store, from);
resMap.insert_or_assign(std::move(k), std::move(v));
}
return resMap;
}
template<typename K, typename V>
void WorkerProto<std::map<K, V>>::write(const Store & store, Sink & out, const std::map<K, V> & resMap)
{
out << resMap.size();
for (auto & i : resMap) {
WorkerProto<K>::write(store, out, i.first);
WorkerProto<V>::write(store, out, i.second);
}
}
} }

View file

@ -4,6 +4,9 @@
* *
* Template implementations (as opposed to mere declarations). * Template implementations (as opposed to mere declarations).
* *
* This file is an exmample of the "impl.hh" pattern. See the
* contributing guide.
*
* One only needs to include this when one is declaring a * One only needs to include this when one is declaring a
* `BaseClass<CustomType>` setting, or as derived class of such an * `BaseClass<CustomType>` setting, or as derived class of such an
* instantiation. * instantiation.

View file

@ -12,6 +12,7 @@
#include "shared.hh" #include "shared.hh"
#include "util.hh" #include "util.hh"
#include "worker-protocol.hh" #include "worker-protocol.hh"
#include "worker-protocol-impl.hh"
#include "graphml.hh" #include "graphml.hh"
#include "legacy.hh" #include "legacy.hh"
#include "path-with-outputs.hh" #include "path-with-outputs.hh"
@ -806,6 +807,9 @@ static void opServe(Strings opFlags, Strings opArgs)
out.flush(); out.flush();
unsigned int clientVersion = readInt(in); unsigned int clientVersion = readInt(in);
WorkerProto::ReadConn rconn { .from = in };
WorkerProto::WriteConn wconn { .to = out };
auto getBuildSettings = [&]() { auto getBuildSettings = [&]() {
// FIXME: changing options here doesn't work if we're // FIXME: changing options here doesn't work if we're
// building through the daemon. // building through the daemon.
@ -837,19 +841,19 @@ static void opServe(Strings opFlags, Strings opArgs)
}; };
while (true) { while (true) {
ServeCommand cmd; ServeProto::Command cmd;
try { try {
cmd = (ServeCommand) readInt(in); cmd = (ServeProto::Command) readInt(in);
} catch (EndOfFile & e) { } catch (EndOfFile & e) {
break; break;
} }
switch (cmd) { switch (cmd) {
case cmdQueryValidPaths: { case ServeProto::Command::QueryValidPaths: {
bool lock = readInt(in); bool lock = readInt(in);
bool substitute = readInt(in); bool substitute = readInt(in);
auto paths = WorkerProto<StorePathSet>::read(*store, in); auto paths = WorkerProto::Serialise<StorePathSet>::read(*store, rconn);
if (lock && writeAllowed) if (lock && writeAllowed)
for (auto & path : paths) for (auto & path : paths)
store->addTempRoot(path); store->addTempRoot(path);
@ -858,19 +862,19 @@ static void opServe(Strings opFlags, Strings opArgs)
store->substitutePaths(paths); store->substitutePaths(paths);
} }
workerProtoWrite(*store, out, store->queryValidPaths(paths)); WorkerProto::write(*store, wconn, store->queryValidPaths(paths));
break; break;
} }
case cmdQueryPathInfos: { case ServeProto::Command::QueryPathInfos: {
auto paths = WorkerProto<StorePathSet>::read(*store, in); auto paths = WorkerProto::Serialise<StorePathSet>::read(*store, rconn);
// !!! Maybe we want a queryPathInfos? // !!! Maybe we want a queryPathInfos?
for (auto & i : paths) { for (auto & i : paths) {
try { try {
auto info = store->queryPathInfo(i); auto info = store->queryPathInfo(i);
out << store->printStorePath(info->path) out << store->printStorePath(info->path)
<< (info->deriver ? store->printStorePath(*info->deriver) : ""); << (info->deriver ? store->printStorePath(*info->deriver) : "");
workerProtoWrite(*store, out, info->references); WorkerProto::write(*store, wconn, info->references);
// !!! Maybe we want compression? // !!! Maybe we want compression?
out << info->narSize // downloadSize out << info->narSize // downloadSize
<< info->narSize; << info->narSize;
@ -885,24 +889,24 @@ static void opServe(Strings opFlags, Strings opArgs)
break; break;
} }
case cmdDumpStorePath: case ServeProto::Command::DumpStorePath:
store->narFromPath(store->parseStorePath(readString(in)), out); store->narFromPath(store->parseStorePath(readString(in)), out);
break; break;
case cmdImportPaths: { case ServeProto::Command::ImportPaths: {
if (!writeAllowed) throw Error("importing paths is not allowed"); if (!writeAllowed) throw Error("importing paths is not allowed");
store->importPaths(in, NoCheckSigs); // FIXME: should we skip sig checking? store->importPaths(in, NoCheckSigs); // FIXME: should we skip sig checking?
out << 1; // indicate success out << 1; // indicate success
break; break;
} }
case cmdExportPaths: { case ServeProto::Command::ExportPaths: {
readInt(in); // obsolete readInt(in); // obsolete
store->exportPaths(WorkerProto<StorePathSet>::read(*store, in), out); store->exportPaths(WorkerProto::Serialise<StorePathSet>::read(*store, rconn), out);
break; break;
} }
case cmdBuildPaths: { case ServeProto::Command::BuildPaths: {
if (!writeAllowed) throw Error("building paths is not allowed"); if (!writeAllowed) throw Error("building paths is not allowed");
@ -923,7 +927,7 @@ static void opServe(Strings opFlags, Strings opArgs)
break; break;
} }
case cmdBuildDerivation: { /* Used by hydra-queue-runner. */ case ServeProto::Command::BuildDerivation: { /* Used by hydra-queue-runner. */
if (!writeAllowed) throw Error("building paths is not allowed"); if (!writeAllowed) throw Error("building paths is not allowed");
@ -944,22 +948,22 @@ static void opServe(Strings opFlags, Strings opArgs)
DrvOutputs builtOutputs; DrvOutputs builtOutputs;
for (auto & [output, realisation] : status.builtOutputs) for (auto & [output, realisation] : status.builtOutputs)
builtOutputs.insert_or_assign(realisation.id, realisation); builtOutputs.insert_or_assign(realisation.id, realisation);
workerProtoWrite(*store, out, builtOutputs); WorkerProto::write(*store, wconn, builtOutputs);
} }
break; break;
} }
case cmdQueryClosure: { case ServeProto::Command::QueryClosure: {
bool includeOutputs = readInt(in); bool includeOutputs = readInt(in);
StorePathSet closure; StorePathSet closure;
store->computeFSClosure(WorkerProto<StorePathSet>::read(*store, in), store->computeFSClosure(WorkerProto::Serialise<StorePathSet>::read(*store, rconn),
closure, false, includeOutputs); closure, false, includeOutputs);
workerProtoWrite(*store, out, closure); WorkerProto::write(*store, wconn, closure);
break; break;
} }
case cmdAddToStoreNar: { case ServeProto::Command::AddToStoreNar: {
if (!writeAllowed) throw Error("importing paths is not allowed"); if (!writeAllowed) throw Error("importing paths is not allowed");
auto path = readString(in); auto path = readString(in);
@ -970,7 +974,7 @@ static void opServe(Strings opFlags, Strings opArgs)
}; };
if (deriver != "") if (deriver != "")
info.deriver = store->parseStorePath(deriver); info.deriver = store->parseStorePath(deriver);
info.references = WorkerProto<StorePathSet>::read(*store, in); info.references = WorkerProto::Serialise<StorePathSet>::read(*store, rconn);
in >> info.registrationTime >> info.narSize >> info.ultimate; in >> info.registrationTime >> info.narSize >> info.ultimate;
info.sigs = readStrings<StringSet>(in); info.sigs = readStrings<StringSet>(in);
info.ca = ContentAddress::parseOpt(readString(in)); info.ca = ContentAddress::parseOpt(readString(in));

View file

@ -4,6 +4,7 @@
#include "shared.hh" #include "shared.hh"
#include "local-store.hh" #include "local-store.hh"
#include "remote-store.hh" #include "remote-store.hh"
#include "remote-store-connection.hh"
#include "util.hh" #include "util.hh"
#include "serialise.hh" #include "serialise.hh"
#include "archive.hh" #include "archive.hh"