Further use Machine::Connection to deduplicate

This commit is contained in:
John Ericson 2023-11-30 11:31:58 -05:00
parent 831a2d9bd5
commit 2bda7ca642

View file

@ -107,27 +107,27 @@ static void openConnection(Machine::ptr machine, Path tmpDir, int stderrFD, Chil
} }
static void copyClosureTo(std::timed_mutex & sendMutex, Store & destStore, static void copyClosureTo(
FdSource & from, FdSink & to, const StorePathSet & paths, Machine::Connection & conn,
Store & destStore,
const StorePathSet & paths,
bool useSubstitutes = false) bool useSubstitutes = false)
{ {
StorePathSet closure; StorePathSet closure;
destStore.computeFSClosure(paths, closure); destStore.computeFSClosure(paths, closure);
WorkerProto::WriteConn wconn { .to = to };
WorkerProto::ReadConn rconn { .from = from };
/* Send the "query valid paths" command with the "lock" option /* Send the "query valid paths" command with the "lock" option
enabled. This prevents a race where the remote host enabled. This prevents a race where the remote host
garbage-collect paths that are already there. Optionally, ask garbage-collect paths that are already there. Optionally, ask
the remote host to substitute missing paths. */ the remote host to substitute missing paths. */
// FIXME: substitute output pollutes our build log // FIXME: substitute output pollutes our build log
to << ServeProto::Command::QueryValidPaths << 1 << useSubstitutes; conn.to << ServeProto::Command::QueryValidPaths << 1 << useSubstitutes;
WorkerProto::write(destStore, wconn, closure); WorkerProto::write(destStore, conn, closure);
to.flush(); conn.to.flush();
/* Get back the set of paths that are already valid on the remote /* Get back the set of paths that are already valid on the remote
host. */ host. */
auto present = WorkerProto::Serialise<StorePathSet>::read(destStore, rconn); auto present = WorkerProto::Serialise<StorePathSet>::read(destStore, conn);
if (present.size() == closure.size()) return; if (present.size() == closure.size()) return;
@ -139,14 +139,14 @@ static void copyClosureTo(std::timed_mutex & sendMutex, Store & destStore,
printMsg(lvlDebug, "sending %d missing paths", missing.size()); printMsg(lvlDebug, "sending %d missing paths", missing.size());
std::unique_lock<std::timed_mutex> sendLock(sendMutex, std::unique_lock<std::timed_mutex> sendLock(conn.machine->state->sendLock,
std::chrono::seconds(600)); std::chrono::seconds(600));
to << ServeProto::Command::ImportPaths; conn.to << ServeProto::Command::ImportPaths;
destStore.exportPaths(missing, to); destStore.exportPaths(missing, conn.to);
to.flush(); conn.to.flush();
if (readInt(from) != 1) if (readInt(conn.from) != 1)
throw Error("remote machine failed to import closure"); throw Error("remote machine failed to import closure");
} }
@ -257,7 +257,7 @@ BasicDerivation sendInputs(
destStore.computeFSClosure(basicDrv.inputSrcs, closure); destStore.computeFSClosure(basicDrv.inputSrcs, closure);
copyPaths(destStore, localStore, closure, NoRepair, NoCheckSigs, NoSubstitute); copyPaths(destStore, localStore, closure, NoRepair, NoCheckSigs, NoSubstitute);
} else { } else {
copyClosureTo(conn.machine->state->sendLock, destStore, conn.from, conn.to, basicDrv.inputSrcs, true); copyClosureTo(conn, destStore, basicDrv.inputSrcs, true);
} }
auto now2 = std::chrono::steady_clock::now(); auto now2 = std::chrono::steady_clock::now();