diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc index f19008dc..058af02c 100644 --- a/src/hydra-queue-runner/build-remote.cc +++ b/src/hydra-queue-runner/build-remote.cc @@ -8,6 +8,7 @@ #include "build-result.hh" #include "path.hh" #include "serve-protocol.hh" +#include "serve-protocol-impl.hh" #include "state.hh" #include "current-process.hh" #include "processes.hh" @@ -123,13 +124,10 @@ static void copyClosureTo( garbage-collect paths that are already there. Optionally, ask the remote host to substitute missing paths. */ // FIXME: substitute output pollutes our build log - conn.to << ServeProto::Command::QueryValidPaths << 1 << useSubstitutes; - ServeProto::write(destStore, conn, closure); - conn.to.flush(); - /* Get back the set of paths that are already valid on the remote host. */ - auto present = ServeProto::Serialise::read(destStore, conn); + auto present = conn.queryValidPaths( + destStore, true, closure, useSubstitutes); if (present.size() == closure.size()) return; @@ -195,33 +193,6 @@ static std::pair openLogFile(const std::string & logDir, cons return {std::move(logFile), std::move(logFD)}; } -/** - * @param conn is not fully initialized; it is this functions job to set - * the `remoteVersion` field after the handshake is completed. - * Therefore, no `ServeProto::Serialize` functions can be used until - * that field is set. - */ -static void handshake(Machine::Connection & conn, unsigned int repeats) -{ - constexpr ServeProto::Version our_version = 0x206; - - conn.to << SERVE_MAGIC_1 << our_version; - conn.to.flush(); - - unsigned int magic = readInt(conn.from); - if (magic != SERVE_MAGIC_2) - throw Error("protocol mismatch with ‘nix-store --serve’ on ‘%1%’", conn.machine->sshName); - conn.remoteVersion = readInt(conn.from); - // Now `conn` is initialized. - if (GET_PROTOCOL_MAJOR(conn.remoteVersion) != 0x200) - throw Error("unsupported ‘nix-store --serve’ protocol version on ‘%1%’", conn.machine->sshName); - if (GET_PROTOCOL_MINOR(conn.remoteVersion) < 3 && repeats > 0) - throw Error("machine ‘%1%’ does not support repeating a build; please upgrade it to Nix 1.12", conn.machine->sshName); - - // Do not attempt to speak a newer version of the protocol - conn.remoteVersion = std::min(conn.remoteVersion, our_version); -} - static BasicDerivation sendInputs( State & state, Step & step, @@ -291,10 +262,7 @@ static BuildResult performBuild( counter & nrStepsBuilding ) { - conn.to << ServeProto::Command::BuildDerivation << localStore.printStorePath(drvPath); - writeDerivation(conn.to, localStore, drv); - ServeProto::write(localStore, conn, options); - conn.to.flush(); + conn.putBuildDerivationRequest(localStore, drvPath, drv, options); BuildResult result; @@ -498,9 +466,13 @@ void State::buildRemote(ref destStore, }); Machine::Connection conn { - .from = child.from.get(), - .to = child.to.get(), - .machine = machine, + { + .to = child.to.get(), + .from = child.from.get(), + /* Handshake. */ + .remoteVersion = 0xdadbeef, // FIXME avoid dummy initialize + }, + /*.machine =*/ machine, }; Finally updateStats([&]() { @@ -508,14 +480,26 @@ void State::buildRemote(ref destStore, bytesSent += conn.to.written; }); + constexpr ServeProto::Version our_version = 0x206; + try { - build_remote::handshake(conn, buildOptions.nrRepeats); + conn.remoteVersion = decltype(conn)::handshake( + conn.to, + conn.from, + our_version, + machine->sshName); } catch (EndOfFile & e) { child.pid.wait(); std::string s = chomp(readFile(result.logFile)); throw Error("cannot connect to ‘%1%’: %2%", machine->sshName, s); } + // Do not attempt to speak a newer version of the protocol. + // + // Per https://github.com/NixOS/nix/issues/9584 should be handled as + // part of `handshake` in upstream nix. + conn.remoteVersion = std::min(conn.remoteVersion, our_version); + { auto info(machine->state->connectInfo.lock()); info->consecutiveFailures = 0; diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index 830f0598..82481ab5 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -22,6 +22,7 @@ #include "sync.hh" #include "nar-extractor.hh" #include "serve-protocol.hh" +#include "serve-protocol-impl.hh" typedef unsigned int BuildID; @@ -302,29 +303,9 @@ struct Machine } // A connection to a machine - struct Connection { - nix::FdSource from; - nix::FdSink to; - nix::ServeProto::Version remoteVersion; - + struct Connection : nix::ServeProto::BasicClientConnection { // Backpointer to the machine ptr machine; - - operator nix::ServeProto::ReadConn () - { - return { - .from = from, - .version = remoteVersion, - }; - } - - operator nix::ServeProto::WriteConn () - { - return { - .to = to, - .version = remoteVersion, - }; - } }; };