Merge pull request #9137 from obsidiansystems/serve-protocol

Introduce separate Serve protocol serialisers
This commit is contained in:
John Ericson 2023-10-13 10:51:46 -04:00 committed by GitHub
commit d070d8b746
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 344 additions and 39 deletions

View file

@ -3,11 +3,10 @@
#include "pool.hh" #include "pool.hh"
#include "remote-store.hh" #include "remote-store.hh"
#include "serve-protocol.hh" #include "serve-protocol.hh"
#include "serve-protocol-impl.hh"
#include "build-result.hh" #include "build-result.hh"
#include "store-api.hh" #include "store-api.hh"
#include "path-with-outputs.hh" #include "path-with-outputs.hh"
#include "common-protocol.hh"
#include "common-protocol-impl.hh"
#include "ssh.hh" #include "ssh.hh"
#include "derivations.hh" #include "derivations.hh"
#include "callback.hh" #include "callback.hh"
@ -50,37 +49,31 @@ struct LegacySSHStore : public virtual LegacySSHStoreConfig, public virtual Stor
bool good = true; bool good = true;
/** /**
* Coercion to `CommonProto::ReadConn`. This makes it easy to use the * Coercion to `ServeProto::ReadConn`. This makes it easy to use the
* factored out common protocol serialisers with a * factored out serve protocol searlizers with a
* `LegacySSHStore::Connection`. * `LegacySSHStore::Connection`.
* *
* The common protocol connection types are unidirectional, unlike * The serve protocol connection types are unidirectional, unlike
* this type. * this type.
*
* @todo Use server protocol serializers, not common protocol
* serializers, once we have made that distiction.
*/ */
operator CommonProto::ReadConn () operator ServeProto::ReadConn ()
{ {
return CommonProto::ReadConn { return ServeProto::ReadConn {
.from = from, .from = from,
}; };
} }
/* /*
* Coercion to `CommonProto::WriteConn`. This makes it easy to use the * Coercion to `ServeProto::WriteConn`. This makes it easy to use the
* factored out common protocol searlizers with a * factored out serve protocol searlizers with a
* `LegacySSHStore::Connection`. * `LegacySSHStore::Connection`.
* *
* The common protocol connection types are unidirectional, unlike * The serve protocol connection types are unidirectional, unlike
* this type. * this type.
*
* @todo Use server protocol serializers, not common protocol
* serializers, once we have made that distiction.
*/ */
operator CommonProto::WriteConn () operator ServeProto::WriteConn ()
{ {
return CommonProto::WriteConn { return ServeProto::WriteConn {
.to = to, .to = to,
}; };
} }
@ -183,7 +176,7 @@ struct LegacySSHStore : public virtual LegacySSHStoreConfig, public virtual Stor
auto deriver = readString(conn->from); auto deriver = readString(conn->from);
if (deriver != "") if (deriver != "")
info->deriver = parseStorePath(deriver); info->deriver = parseStorePath(deriver);
info->references = CommonProto::Serialise<StorePathSet>::read(*this, *conn); info->references = ServeProto::Serialise<StorePathSet>::read(*this, *conn);
readLongLong(conn->from); // download size readLongLong(conn->from); // download size
info->narSize = readLongLong(conn->from); info->narSize = readLongLong(conn->from);
@ -217,7 +210,7 @@ struct LegacySSHStore : public virtual LegacySSHStoreConfig, public virtual Stor
<< printStorePath(info.path) << printStorePath(info.path)
<< (info.deriver ? printStorePath(*info.deriver) : "") << (info.deriver ? printStorePath(*info.deriver) : "")
<< info.narHash.to_string(Base16, false); << info.narHash.to_string(Base16, false);
CommonProto::write(*this, *conn, info.references); ServeProto::write(*this, *conn, info.references);
conn->to conn->to
<< info.registrationTime << info.registrationTime
<< info.narSize << info.narSize
@ -246,7 +239,7 @@ struct LegacySSHStore : public virtual LegacySSHStoreConfig, public virtual Stor
conn->to conn->to
<< exportMagic << exportMagic
<< printStorePath(info.path); << printStorePath(info.path);
CommonProto::write(*this, *conn, info.references); ServeProto::write(*this, *conn, info.references);
conn->to conn->to
<< (info.deriver ? printStorePath(*info.deriver) : "") << (info.deriver ? printStorePath(*info.deriver) : "")
<< 0 << 0
@ -331,7 +324,7 @@ public:
if (GET_PROTOCOL_MINOR(conn->remoteVersion) >= 3) if (GET_PROTOCOL_MINOR(conn->remoteVersion) >= 3)
conn->from >> status.timesBuilt >> status.isNonDeterministic >> status.startTime >> status.stopTime; conn->from >> status.timesBuilt >> status.isNonDeterministic >> status.startTime >> status.stopTime;
if (GET_PROTOCOL_MINOR(conn->remoteVersion) >= 6) { if (GET_PROTOCOL_MINOR(conn->remoteVersion) >= 6) {
auto builtOutputs = CommonProto::Serialise<DrvOutputs>::read(*this, *conn); auto builtOutputs = ServeProto::Serialise<DrvOutputs>::read(*this, *conn);
for (auto && [output, realisation] : builtOutputs) for (auto && [output, realisation] : builtOutputs)
status.builtOutputs.insert_or_assign( status.builtOutputs.insert_or_assign(
std::move(output.outputName), std::move(output.outputName),
@ -409,10 +402,10 @@ public:
conn->to conn->to
<< ServeProto::Command::QueryClosure << ServeProto::Command::QueryClosure
<< includeOutputs; << includeOutputs;
CommonProto::write(*this, *conn, paths); ServeProto::write(*this, *conn, paths);
conn->to.flush(); conn->to.flush();
for (auto & i : CommonProto::Serialise<StorePathSet>::read(*this, *conn)) for (auto & i : ServeProto::Serialise<StorePathSet>::read(*this, *conn))
out.insert(i); out.insert(i);
} }
@ -425,10 +418,10 @@ public:
<< ServeProto::Command::QueryValidPaths << ServeProto::Command::QueryValidPaths
<< false // lock << false // lock
<< maybeSubstitute; << maybeSubstitute;
CommonProto::write(*this, *conn, paths); ServeProto::write(*this, *conn, paths);
conn->to.flush(); conn->to.flush();
return CommonProto::Serialise<StorePathSet>::read(*this, *conn); return ServeProto::Serialise<StorePathSet>::read(*this, *conn);
} }
void connect() override void connect() override

View file

@ -0,0 +1,59 @@
#pragma once
/**
* @file
*
* Template implementations (as opposed to mere declarations).
*
* This file is an exmample of the "impl.hh" pattern. See the
* contributing guide.
*/
#include "serve-protocol.hh"
#include "length-prefixed-protocol-helper.hh"
namespace nix {
/* protocol-agnostic templates */
#define SERVE_USE_LENGTH_PREFIX_SERIALISER(TEMPLATE, T) \
TEMPLATE T ServeProto::Serialise< T >::read(const Store & store, ServeProto::ReadConn conn) \
{ \
return LengthPrefixedProtoHelper<ServeProto, T >::read(store, conn); \
} \
TEMPLATE void ServeProto::Serialise< T >::write(const Store & store, ServeProto::WriteConn conn, const T & t) \
{ \
LengthPrefixedProtoHelper<ServeProto, T >::write(store, conn, t); \
}
SERVE_USE_LENGTH_PREFIX_SERIALISER(template<typename T>, std::vector<T>)
SERVE_USE_LENGTH_PREFIX_SERIALISER(template<typename T>, std::set<T>)
SERVE_USE_LENGTH_PREFIX_SERIALISER(template<typename... Ts>, std::tuple<Ts...>)
#define COMMA_ ,
SERVE_USE_LENGTH_PREFIX_SERIALISER(
template<typename K COMMA_ typename V>,
std::map<K COMMA_ V>)
#undef COMMA_
/**
* Use `CommonProto` where possible.
*/
template<typename T>
struct ServeProto::Serialise
{
static T read(const Store & store, ServeProto::ReadConn conn)
{
return CommonProto::Serialise<T>::read(store,
CommonProto::ReadConn { .from = conn.from });
}
static void write(const Store & store, ServeProto::WriteConn conn, const T & t)
{
CommonProto::Serialise<T>::write(store,
CommonProto::WriteConn { .to = conn.to },
t);
}
};
/* protocol-specific templates */
}

View file

@ -0,0 +1,15 @@
#include "serialise.hh"
#include "util.hh"
#include "path-with-outputs.hh"
#include "store-api.hh"
#include "serve-protocol.hh"
#include "serve-protocol-impl.hh"
#include "archive.hh"
#include <nlohmann/json.hpp>
namespace nix {
/* protocol-specific definitions */
}

View file

@ -1,6 +1,8 @@
#pragma once #pragma once
///@file ///@file
#include "common-protocol.hh"
namespace nix { namespace nix {
#define SERVE_MAGIC_1 0x390c9deb #define SERVE_MAGIC_1 0x390c9deb
@ -10,6 +12,11 @@ namespace nix {
#define GET_PROTOCOL_MAJOR(x) ((x) & 0xff00) #define GET_PROTOCOL_MAJOR(x) ((x) & 0xff00)
#define GET_PROTOCOL_MINOR(x) ((x) & 0x00ff) #define GET_PROTOCOL_MINOR(x) ((x) & 0x00ff)
class Store;
struct Source;
/** /**
* The "serve protocol", used by ssh:// stores. * The "serve protocol", used by ssh:// stores.
* *
@ -22,6 +29,57 @@ struct ServeProto
* Enumeration of all the request types for the protocol. * Enumeration of all the request types for the protocol.
*/ */
enum struct Command : uint64_t; enum struct Command : uint64_t;
/**
* A unidirectional read connection, to be used by the read half of the
* canonical serializers below.
*
* This currently is just a `Source &`, but more fields will be added
* later.
*/
struct ReadConn {
Source & from;
};
/**
* A unidirectional write connection, to be used by the write half of the
* canonical serializers below.
*
* This currently is just a `Sink &`, but more fields will be added
* later.
*/
struct WriteConn {
Sink & to;
};
/**
* Data type for canonical pairs of serialisers for the serve protocol.
*
* See https://en.cppreference.com/w/cpp/language/adl for the broader
* concept of what is going on here.
*/
template<typename T>
struct Serialise;
// This is the definition of `Serialise` we *want* to put here, but
// do not do so.
//
// See `worker-protocol.hh` for a longer explanation.
#if 0
{
static T read(const Store & store, ReadConn conn);
static void write(const Store & store, WriteConn conn, const T & t);
};
#endif
/**
* Wrapper function around `ServeProto::Serialise<T>::write` that allows us to
* infer the type instead of having to write it down explicitly.
*/
template<typename T>
static void write(const Store & store, WriteConn conn, const T & t)
{
ServeProto::Serialise<T>::write(store, conn, t);
}
}; };
enum struct ServeProto::Command : uint64_t enum struct ServeProto::Command : uint64_t
@ -58,4 +116,33 @@ inline std::ostream & operator << (std::ostream & s, ServeProto::Command op)
return s << (uint64_t) op; return s << (uint64_t) op;
} }
/**
* Declare a canonical serialiser pair for the worker protocol.
*
* We specialise the struct merely to indicate that we are implementing
* the function for the given type.
*
* Some sort of `template<...>` must be used with the caller for this to
* be legal specialization syntax. See below for what that looks like in
* practice.
*/
#define DECLARE_SERVE_SERIALISER(T) \
struct ServeProto::Serialise< T > \
{ \
static T read(const Store & store, ServeProto::ReadConn conn); \
static void write(const Store & store, ServeProto::WriteConn conn, const T & t); \
};
template<typename T>
DECLARE_SERVE_SERIALISER(std::vector<T>);
template<typename T>
DECLARE_SERVE_SERIALISER(std::set<T>);
template<typename... Ts>
DECLARE_SERVE_SERIALISER(std::tuple<Ts...>);
#define COMMA_ ,
template<typename K, typename V>
DECLARE_SERVE_SERIALISER(std::map<K COMMA_ V>);
#undef COMMA_
} }

View file

@ -0,0 +1,152 @@
#include <regex>
#include <nlohmann/json.hpp>
#include <gtest/gtest.h>
#include "serve-protocol.hh"
#include "serve-protocol-impl.hh"
#include "build-result.hh"
#include "tests/protocol.hh"
#include "tests/characterization.hh"
namespace nix {
const char commonProtoDir[] = "serve-protocol";
using ServeProtoTest = ProtoTest<ServeProto, commonProtoDir>;
CHARACTERIZATION_TEST(
ServeProtoTest,
string,
"string",
(std::tuple<std::string, std::string, std::string, std::string, std::string> {
"",
"hi",
"white rabbit",
"大白兔",
"oh no \0\0\0 what was that!",
}))
CHARACTERIZATION_TEST(
ServeProtoTest,
storePath,
"store-path",
(std::tuple<StorePath, StorePath> {
StorePath { "g1w7hy3qg1w7hy3qg1w7hy3qg1w7hy3q-foo" },
StorePath { "g1w7hy3qg1w7hy3qg1w7hy3qg1w7hy3q-foo-bar" },
}))
CHARACTERIZATION_TEST(
ServeProtoTest,
contentAddress,
"content-address",
(std::tuple<ContentAddress, ContentAddress, ContentAddress> {
ContentAddress {
.method = TextIngestionMethod {},
.hash = hashString(HashType::htSHA256, "Derive(...)"),
},
ContentAddress {
.method = FileIngestionMethod::Flat,
.hash = hashString(HashType::htSHA1, "blob blob..."),
},
ContentAddress {
.method = FileIngestionMethod::Recursive,
.hash = hashString(HashType::htSHA256, "(...)"),
},
}))
CHARACTERIZATION_TEST(
ServeProtoTest,
drvOutput,
"drv-output",
(std::tuple<DrvOutput, DrvOutput> {
{
.drvHash = Hash::parseSRI("sha256-FePFYIlMuycIXPZbWi7LGEiMmZSX9FMbaQenWBzm1Sc="),
.outputName = "baz",
},
DrvOutput {
.drvHash = Hash::parseSRI("sha256-b4afnqKCO9oWXgYHb9DeQ2berSwOjS27rSd9TxXDc/U="),
.outputName = "quux",
},
}))
CHARACTERIZATION_TEST(
ServeProtoTest,
realisation,
"realisation",
(std::tuple<Realisation, Realisation> {
Realisation {
.id = DrvOutput {
.drvHash = Hash::parseSRI("sha256-FePFYIlMuycIXPZbWi7LGEiMmZSX9FMbaQenWBzm1Sc="),
.outputName = "baz",
},
.outPath = StorePath { "g1w7hy3qg1w7hy3qg1w7hy3qg1w7hy3q-foo" },
.signatures = { "asdf", "qwer" },
},
Realisation {
.id = {
.drvHash = Hash::parseSRI("sha256-FePFYIlMuycIXPZbWi7LGEiMmZSX9FMbaQenWBzm1Sc="),
.outputName = "baz",
},
.outPath = StorePath { "g1w7hy3qg1w7hy3qg1w7hy3qg1w7hy3q-foo" },
.signatures = { "asdf", "qwer" },
.dependentRealisations = {
{
DrvOutput {
.drvHash = Hash::parseSRI("sha256-b4afnqKCO9oWXgYHb9DeQ2berSwOjS27rSd9TxXDc/U="),
.outputName = "quux",
},
StorePath { "g1w7hy3qg1w7hy3qg1w7hy3qg1w7hy3q-foo" },
},
},
},
}))
CHARACTERIZATION_TEST(
ServeProtoTest,
vector,
"vector",
(std::tuple<std::vector<std::string>, std::vector<std::string>, std::vector<std::string>, std::vector<std::vector<std::string>>> {
{ },
{ "" },
{ "", "foo", "bar" },
{ {}, { "" }, { "", "1", "2" } },
}))
CHARACTERIZATION_TEST(
ServeProtoTest,
set,
"set",
(std::tuple<std::set<std::string>, std::set<std::string>, std::set<std::string>, std::set<std::set<std::string>>> {
{ },
{ "" },
{ "", "foo", "bar" },
{ {}, { "" }, { "", "1", "2" } },
}))
CHARACTERIZATION_TEST(
ServeProtoTest,
optionalStorePath,
"optional-store-path",
(std::tuple<std::optional<StorePath>, std::optional<StorePath>> {
std::nullopt,
std::optional {
StorePath { "g1w7hy3qg1w7hy3qg1w7hy3qg1w7hy3q-foo-bar" },
},
}))
CHARACTERIZATION_TEST(
ServeProtoTest,
optionalContentAddress,
"optional-content-address",
(std::tuple<std::optional<ContentAddress>, std::optional<ContentAddress>> {
std::nullopt,
std::optional {
ContentAddress {
.method = FileIngestionMethod::Flat,
.hash = hashString(HashType::htSHA1, "blob blob..."),
},
},
}))
}

View file

@ -9,10 +9,9 @@
#include "local-store.hh" #include "local-store.hh"
#include "monitor-fd.hh" #include "monitor-fd.hh"
#include "serve-protocol.hh" #include "serve-protocol.hh"
#include "serve-protocol-impl.hh"
#include "shared.hh" #include "shared.hh"
#include "util.hh" #include "util.hh"
#include "common-protocol.hh"
#include "common-protocol-impl.hh"
#include "graphml.hh" #include "graphml.hh"
#include "legacy.hh" #include "legacy.hh"
#include "path-with-outputs.hh" #include "path-with-outputs.hh"
@ -821,8 +820,8 @@ static void opServe(Strings opFlags, Strings opArgs)
out.flush(); out.flush();
unsigned int clientVersion = readInt(in); unsigned int clientVersion = readInt(in);
CommonProto::ReadConn rconn { .from = in }; ServeProto::ReadConn rconn { .from = in };
CommonProto::WriteConn wconn { .to = out }; ServeProto::WriteConn wconn { .to = out };
auto getBuildSettings = [&]() { auto getBuildSettings = [&]() {
// FIXME: changing options here doesn't work if we're // FIXME: changing options here doesn't work if we're
@ -867,7 +866,7 @@ static void opServe(Strings opFlags, Strings opArgs)
case ServeProto::Command::QueryValidPaths: { case ServeProto::Command::QueryValidPaths: {
bool lock = readInt(in); bool lock = readInt(in);
bool substitute = readInt(in); bool substitute = readInt(in);
auto paths = CommonProto::Serialise<StorePathSet>::read(*store, rconn); auto paths = ServeProto::Serialise<StorePathSet>::read(*store, rconn);
if (lock && writeAllowed) if (lock && writeAllowed)
for (auto & path : paths) for (auto & path : paths)
store->addTempRoot(path); store->addTempRoot(path);
@ -876,19 +875,19 @@ static void opServe(Strings opFlags, Strings opArgs)
store->substitutePaths(paths); store->substitutePaths(paths);
} }
CommonProto::write(*store, wconn, store->queryValidPaths(paths)); ServeProto::write(*store, wconn, store->queryValidPaths(paths));
break; break;
} }
case ServeProto::Command::QueryPathInfos: { case ServeProto::Command::QueryPathInfos: {
auto paths = CommonProto::Serialise<StorePathSet>::read(*store, rconn); auto paths = ServeProto::Serialise<StorePathSet>::read(*store, rconn);
// !!! Maybe we want a queryPathInfos? // !!! Maybe we want a queryPathInfos?
for (auto & i : paths) { for (auto & i : paths) {
try { try {
auto info = store->queryPathInfo(i); auto info = store->queryPathInfo(i);
out << store->printStorePath(info->path) out << store->printStorePath(info->path)
<< (info->deriver ? store->printStorePath(*info->deriver) : ""); << (info->deriver ? store->printStorePath(*info->deriver) : "");
CommonProto::write(*store, wconn, info->references); ServeProto::write(*store, wconn, info->references);
// !!! Maybe we want compression? // !!! Maybe we want compression?
out << info->narSize // downloadSize out << info->narSize // downloadSize
<< info->narSize; << info->narSize;
@ -916,7 +915,7 @@ static void opServe(Strings opFlags, Strings opArgs)
case ServeProto::Command::ExportPaths: { case ServeProto::Command::ExportPaths: {
readInt(in); // obsolete readInt(in); // obsolete
store->exportPaths(CommonProto::Serialise<StorePathSet>::read(*store, rconn), out); store->exportPaths(ServeProto::Serialise<StorePathSet>::read(*store, rconn), out);
break; break;
} }
@ -962,7 +961,7 @@ static void opServe(Strings opFlags, Strings opArgs)
DrvOutputs builtOutputs; DrvOutputs builtOutputs;
for (auto & [output, realisation] : status.builtOutputs) for (auto & [output, realisation] : status.builtOutputs)
builtOutputs.insert_or_assign(realisation.id, realisation); builtOutputs.insert_or_assign(realisation.id, realisation);
CommonProto::write(*store, wconn, builtOutputs); ServeProto::write(*store, wconn, builtOutputs);
} }
break; break;
@ -971,9 +970,9 @@ static void opServe(Strings opFlags, Strings opArgs)
case ServeProto::Command::QueryClosure: { case ServeProto::Command::QueryClosure: {
bool includeOutputs = readInt(in); bool includeOutputs = readInt(in);
StorePathSet closure; StorePathSet closure;
store->computeFSClosure(CommonProto::Serialise<StorePathSet>::read(*store, rconn), store->computeFSClosure(ServeProto::Serialise<StorePathSet>::read(*store, rconn),
closure, false, includeOutputs); closure, false, includeOutputs);
CommonProto::write(*store, wconn, closure); ServeProto::write(*store, wconn, closure);
break; break;
} }
@ -988,7 +987,7 @@ static void opServe(Strings opFlags, Strings opArgs)
}; };
if (deriver != "") if (deriver != "")
info.deriver = store->parseStorePath(deriver); info.deriver = store->parseStorePath(deriver);
info.references = CommonProto::Serialise<StorePathSet>::read(*store, rconn); info.references = ServeProto::Serialise<StorePathSet>::read(*store, rconn);
in >> info.registrationTime >> info.narSize >> info.ultimate; in >> info.registrationTime >> info.narSize >> info.ultimate;
info.sigs = readStrings<StringSet>(in); info.sigs = readStrings<StringSet>(in);
info.ca = ContentAddress::parseOpt(readString(in)); info.ca = ContentAddress::parseOpt(readString(in));

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.