From 2bda7ca642528e1a7c7188d72460fe90c89c4ebf Mon Sep 17 00:00:00 2001 From: John Ericson Date: Thu, 30 Nov 2023 11:31:58 -0500 Subject: [PATCH] Further use `Machine::Connection` to deduplicate --- src/hydra-queue-runner/build-remote.cc | 28 +++++++++++++------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc index a82a95e6..8f8d6532 100644 --- a/src/hydra-queue-runner/build-remote.cc +++ b/src/hydra-queue-runner/build-remote.cc @@ -107,27 +107,27 @@ static void openConnection(Machine::ptr machine, Path tmpDir, int stderrFD, Chil } -static void copyClosureTo(std::timed_mutex & sendMutex, Store & destStore, - FdSource & from, FdSink & to, const StorePathSet & paths, +static void copyClosureTo( + Machine::Connection & conn, + Store & destStore, + const StorePathSet & paths, bool useSubstitutes = false) { StorePathSet closure; destStore.computeFSClosure(paths, closure); - WorkerProto::WriteConn wconn { .to = to }; - WorkerProto::ReadConn rconn { .from = from }; /* Send the "query valid paths" command with the "lock" option enabled. This prevents a race where the remote host garbage-collect paths that are already there. Optionally, ask the remote host to substitute missing paths. */ // FIXME: substitute output pollutes our build log - to << ServeProto::Command::QueryValidPaths << 1 << useSubstitutes; - WorkerProto::write(destStore, wconn, closure); - to.flush(); + conn.to << ServeProto::Command::QueryValidPaths << 1 << useSubstitutes; + WorkerProto::write(destStore, conn, closure); + conn.to.flush(); /* Get back the set of paths that are already valid on the remote host. */ - auto present = WorkerProto::Serialise::read(destStore, rconn); + auto present = WorkerProto::Serialise::read(destStore, conn); if (present.size() == closure.size()) return; @@ -139,14 +139,14 @@ static void copyClosureTo(std::timed_mutex & sendMutex, Store & destStore, printMsg(lvlDebug, "sending %d missing paths", missing.size()); - std::unique_lock sendLock(sendMutex, + std::unique_lock sendLock(conn.machine->state->sendLock, std::chrono::seconds(600)); - to << ServeProto::Command::ImportPaths; - destStore.exportPaths(missing, to); - to.flush(); + conn.to << ServeProto::Command::ImportPaths; + destStore.exportPaths(missing, conn.to); + conn.to.flush(); - if (readInt(from) != 1) + if (readInt(conn.from) != 1) throw Error("remote machine failed to import closure"); } @@ -257,7 +257,7 @@ BasicDerivation sendInputs( destStore.computeFSClosure(basicDrv.inputSrcs, closure); copyPaths(destStore, localStore, closure, NoRepair, NoCheckSigs, NoSubstitute); } else { - copyClosureTo(conn.machine->state->sendLock, destStore, conn.from, conn.to, basicDrv.inputSrcs, true); + copyClosureTo(conn, destStore, basicDrv.inputSrcs, true); } auto now2 = std::chrono::steady_clock::now();