Factor more stuff out

This commit is contained in:
Théophane Hufschmitt 2022-03-21 11:35:38 +01:00
parent 2f494b7834
commit 9f1b911625

View file

@ -188,6 +188,87 @@ std::pair<Path, AutoCloseFD> openLogFile(const std::string & logDir, const Store
return {std::move(logFile), std::move(logFD)}; return {std::move(logFile), std::move(logFD)};
} }
void handshake(Machine::Connection & conn, unsigned int repeats)
{
conn.to << SERVE_MAGIC_1 << 0x204;
conn.to.flush();
unsigned int magic = readInt(conn.from);
if (magic != SERVE_MAGIC_2)
throw Error("protocol mismatch with nix-store --serve on %1%", conn.machine->sshName);
conn.remoteVersion = readInt(conn.from);
if (GET_PROTOCOL_MAJOR(conn.remoteVersion) != 0x200)
throw Error("unsupported nix-store --serve protocol version on %1%", conn.machine->sshName);
if (GET_PROTOCOL_MINOR(conn.remoteVersion) < 3 && repeats > 0)
throw Error("machine %1% does not support repeating a build; please upgrade it to Nix 1.12", conn.machine->sshName);
}
StorePathSet sendInputs(
State & state,
Step & step,
Store & localStore,
Store & destStore,
Machine::Connection & conn,
unsigned int & overhead,
counter & nrStepsWaiting,
counter & nrStepsCopyingTo
)
{
StorePathSet inputs;
BasicDerivation basicDrv(*step.drv);
for (auto & p : step.drv->inputSrcs)
inputs.insert(p);
for (auto & input : step.drv->inputDrvs) {
auto drv2 = localStore.readDerivation(input.first);
for (auto & name : input.second) {
if (auto i = get(drv2.outputs, name)) {
auto outPath = i->path(localStore, drv2.name, name);
inputs.insert(*outPath);
basicDrv.inputSrcs.insert(*outPath);
}
}
}
/* Ensure that the inputs exist in the destination store. This is
a no-op for regular stores, but for the binary cache store,
this will copy the inputs to the binary cache from the local
store. */
if (localStore.getUri() != destStore.getUri()) {
StorePathSet closure;
localStore.computeFSClosure(step.drv->inputSrcs, closure);
copyPaths(localStore, destStore, closure, NoRepair, NoCheckSigs, NoSubstitute);
}
{
auto mc1 = std::make_shared<MaintainCount<counter>>(nrStepsWaiting);
mc1.reset();
MaintainCount<counter> mc2(nrStepsCopyingTo);
printMsg(lvlDebug, "sending closure of %s to %s",
localStore.printStorePath(step.drvPath), conn.machine->sshName);
auto now1 = std::chrono::steady_clock::now();
/* Copy the input closure. */
if (conn.machine->isLocalhost()) {
StorePathSet closure;
destStore.computeFSClosure(inputs, closure);
copyPaths(destStore, localStore, closure, NoRepair, NoCheckSigs, NoSubstitute);
} else {
copyClosureTo(conn.machine->state->sendLock, destStore, conn.from, conn.to, inputs, true);
}
auto now2 = std::chrono::steady_clock::now();
overhead += std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
}
return inputs;
}
void State::buildRemote(ref<Store> destStore, void State::buildRemote(ref<Store> destStore,
Machine::ptr machine, Step::ptr step, Machine::ptr machine, Step::ptr step,
unsigned int maxSilentTime, unsigned int buildTimeout, unsigned int repeats, unsigned int maxSilentTime, unsigned int buildTimeout, unsigned int repeats,
@ -230,33 +311,21 @@ void State::buildRemote(ref<Store> destStore,
process. Meh. */ process. Meh. */
}); });
FdSource from(child.from.get()); Machine::Connection conn;
FdSink to(child.to.get()); conn.from = child.from.get();
conn.to = child.to.get();
conn.machine = machine;
Finally updateStats([&]() { Finally updateStats([&]() {
bytesReceived += from.read; bytesReceived += conn.from.read;
bytesSent += to.written; bytesSent += conn.to.written;
}); });
/* Handshake. */
unsigned int remoteVersion;
try { try {
to << SERVE_MAGIC_1 << 0x204; handshake(conn, repeats);
to.flush();
unsigned int magic = readInt(from);
if (magic != SERVE_MAGIC_2)
throw Error("protocol mismatch with nix-store --serve on %1%", machine->sshName);
remoteVersion = readInt(from);
if (GET_PROTOCOL_MAJOR(remoteVersion) != 0x200)
throw Error("unsupported nix-store --serve protocol version on %1%", machine->sshName);
if (GET_PROTOCOL_MINOR(remoteVersion) < 3 && repeats > 0)
throw Error("machine %1% does not support repeating a build; please upgrade it to Nix 1.12", machine->sshName);
} catch (EndOfFile & e) { } catch (EndOfFile & e) {
child.pid.wait(); child.pid.wait();
string s = chomp(readFile(result.logFile)); std::string s = chomp(readFile(result.logFile));
throw Error("cannot connect to %1%: %2%", machine->sshName, s); throw Error("cannot connect to %1%: %2%", machine->sshName, s);
} }
@ -272,56 +341,7 @@ void State::buildRemote(ref<Store> destStore,
outputs of the input derivations. */ outputs of the input derivations. */
updateStep(ssSendingInputs); updateStep(ssSendingInputs);
StorePathSet inputs; StorePathSet inputs = sendInputs(*this, *step, *localStore, *destStore, conn, result.overhead, nrStepsWaiting, nrStepsCopyingTo);
BasicDerivation basicDrv(*step->drv);
for (auto & p : step->drv->inputSrcs)
inputs.insert(p);
for (auto & input : step->drv->inputDrvs) {
auto drv2 = localStore->readDerivation(input.first);
for (auto & name : input.second) {
if (auto i = get(drv2.outputs, name)) {
auto outPath = i->path(*localStore, drv2.name, name);
inputs.insert(*outPath);
basicDrv.inputSrcs.insert(*outPath);
}
}
}
/* Ensure that the inputs exist in the destination store. This is
a no-op for regular stores, but for the binary cache store,
this will copy the inputs to the binary cache from the local
store. */
if (localStore != std::shared_ptr<Store>(destStore)) {
StorePathSet closure;
localStore->computeFSClosure(step->drv->inputSrcs, closure);
copyPaths(*localStore, *destStore, closure, NoRepair, NoCheckSigs, NoSubstitute);
}
{
auto mc1 = std::make_shared<MaintainCount<counter>>(nrStepsWaiting);
mc1.reset();
MaintainCount<counter> mc2(nrStepsCopyingTo);
printMsg(lvlDebug, "sending closure of %s to %s",
localStore->printStorePath(step->drvPath), machine->sshName);
auto now1 = std::chrono::steady_clock::now();
/* Copy the input closure. */
if (machine->isLocalhost()) {
StorePathSet closure;
destStore->computeFSClosure(inputs, closure);
copyPaths(*destStore, *localStore, closure, NoRepair, NoCheckSigs, NoSubstitute);
} else {
copyClosureTo(machine->state->sendLock, *destStore, from, to, inputs, true);
}
auto now2 = std::chrono::steady_clock::now();
result.overhead += std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
}
logFileDel.cancel(); logFileDel.cancel();
@ -342,31 +362,31 @@ void State::buildRemote(ref<Store> destStore,
updateStep(ssBuilding); updateStep(ssBuilding);
to << cmdBuildDerivation << localStore->printStorePath(step->drvPath); conn.to << cmdBuildDerivation << localStore->printStorePath(step->drvPath);
writeDerivation(to, *localStore, basicDrv); writeDerivation(conn.to, *localStore, BasicDerivation(*step->drv));
to << maxSilentTime << buildTimeout; conn.to << maxSilentTime << buildTimeout;
if (GET_PROTOCOL_MINOR(remoteVersion) >= 2) if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 2)
to << maxLogSize; conn.to << maxLogSize;
if (GET_PROTOCOL_MINOR(remoteVersion) >= 3) { if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 3) {
to << repeats // == build-repeat conn.to << repeats // == build-repeat
<< step->isDeterministic; // == enforce-determinism << step->isDeterministic; // == enforce-determinism
} }
to.flush(); conn.to.flush();
result.startTime = time(0); result.startTime = time(0);
int res; int res;
{ {
MaintainCount<counter> mc(nrStepsBuilding); MaintainCount<counter> mc(nrStepsBuilding);
res = readInt(from); res = readInt(conn.from);
} }
result.stopTime = time(0); result.stopTime = time(0);
result.errorMsg = readString(from); result.errorMsg = readString(conn.from);
if (GET_PROTOCOL_MINOR(remoteVersion) >= 3) { if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 3) {
result.timesBuilt = readInt(from); result.timesBuilt = readInt(conn.from);
result.isNonDeterministic = readInt(from); result.isNonDeterministic = readInt(conn.from);
auto start = readInt(from); auto start = readInt(conn.from);
auto stop = readInt(from); auto stop = readInt(conn.from);
if (start && start) { if (start && start) {
/* Note: this represents the duration of a single /* Note: this represents the duration of a single
round, rather than all rounds. */ round, rather than all rounds. */
@ -374,8 +394,8 @@ void State::buildRemote(ref<Store> destStore,
result.stopTime = stop; result.stopTime = stop;
} }
} }
if (GET_PROTOCOL_MINOR(remoteVersion) >= 6) { if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 6) {
worker_proto::read(*localStore, from, Phantom<DrvOutputs> {}); worker_proto::read(*localStore, conn.from, Phantom<DrvOutputs> {});
} }
switch ((BuildResult::Status) res) { switch ((BuildResult::Status) res) {
case BuildResult::Built: case BuildResult::Built:
@ -451,19 +471,19 @@ void State::buildRemote(ref<Store> destStore,
/* Get info about each output path. */ /* Get info about each output path. */
std::map<StorePath, ValidPathInfo> infos; std::map<StorePath, ValidPathInfo> infos;
size_t totalNarSize = 0; size_t totalNarSize = 0;
to << cmdQueryPathInfos; conn.to << cmdQueryPathInfos;
worker_proto::write(*localStore, to, outputs); worker_proto::write(*localStore, conn.to, outputs);
to.flush(); conn.to.flush();
while (true) { while (true) {
auto storePathS = readString(from); auto storePathS = readString(conn.from);
if (storePathS == "") break; if (storePathS == "") break;
auto deriver = readString(from); // deriver auto deriver = readString(conn.from); // deriver
auto references = worker_proto::read(*localStore, from, Phantom<StorePathSet> {}); auto references = worker_proto::read(*localStore, conn.from, Phantom<StorePathSet> {});
readLongLong(from); // download size readLongLong(conn.from); // download size
auto narSize = readLongLong(from); auto narSize = readLongLong(conn.from);
auto narHash = Hash::parseAny(readString(from), htSHA256); auto narHash = Hash::parseAny(readString(conn.from), htSHA256);
auto ca = parseContentAddressOpt(readString(from)); auto ca = parseContentAddressOpt(readString(conn.from));
readStrings<StringSet>(from); // sigs readStrings<StringSet>(conn.from); // sigs
ValidPathInfo info(localStore->parseStorePath(storePathS), narHash); ValidPathInfo info(localStore->parseStorePath(storePathS), narHash);
assert(outputs.count(info.path)); assert(outputs.count(info.path));
info.references = references; info.references = references;
@ -502,10 +522,10 @@ void State::buildRemote(ref<Store> destStore,
lambda function only gets executed if someone tries to read lambda function only gets executed if someone tries to read
from source2, we will send the command from here rather from source2, we will send the command from here rather
than outside the lambda. */ than outside the lambda. */
to << cmdDumpStorePath << localStore->printStorePath(path); conn.to << cmdDumpStorePath << localStore->printStorePath(path);
to.flush(); conn.to.flush();
TeeSource tee(from, sink); TeeSource tee(conn.from, sink);
extractNarData(tee, localStore->printStorePath(path), narMembers); extractNarData(tee, localStore->printStorePath(path), narMembers);
}); });