diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc index 2e258484..360a8ef7 100644 --- a/src/hydra-queue-runner/build-remote.cc +++ b/src/hydra-queue-runner/build-remote.cc @@ -188,6 +188,87 @@ std::pair openLogFile(const std::string & logDir, const Store return {std::move(logFile), std::move(logFD)}; } +void handshake(Machine::Connection & conn, unsigned int repeats) +{ + conn.to << SERVE_MAGIC_1 << 0x204; + conn.to.flush(); + + unsigned int magic = readInt(conn.from); + if (magic != SERVE_MAGIC_2) + throw Error("protocol mismatch with ‘nix-store --serve’ on ‘%1%’", conn.machine->sshName); + conn.remoteVersion = readInt(conn.from); + if (GET_PROTOCOL_MAJOR(conn.remoteVersion) != 0x200) + throw Error("unsupported ‘nix-store --serve’ protocol version on ‘%1%’", conn.machine->sshName); + if (GET_PROTOCOL_MINOR(conn.remoteVersion) < 3 && repeats > 0) + throw Error("machine ‘%1%’ does not support repeating a build; please upgrade it to Nix 1.12", conn.machine->sshName); +} + +StorePathSet sendInputs( + State & state, + Step & step, + Store & localStore, + Store & destStore, + Machine::Connection & conn, + unsigned int & overhead, + counter & nrStepsWaiting, + counter & nrStepsCopyingTo +) +{ + + StorePathSet inputs; + BasicDerivation basicDrv(*step.drv); + + for (auto & p : step.drv->inputSrcs) + inputs.insert(p); + + for (auto & input : step.drv->inputDrvs) { + auto drv2 = localStore.readDerivation(input.first); + for (auto & name : input.second) { + if (auto i = get(drv2.outputs, name)) { + auto outPath = i->path(localStore, drv2.name, name); + inputs.insert(*outPath); + basicDrv.inputSrcs.insert(*outPath); + } + } + } + + /* Ensure that the inputs exist in the destination store. This is + a no-op for regular stores, but for the binary cache store, + this will copy the inputs to the binary cache from the local + store. */ + if (localStore.getUri() != destStore.getUri()) { + StorePathSet closure; + localStore.computeFSClosure(step.drv->inputSrcs, closure); + copyPaths(localStore, destStore, closure, NoRepair, NoCheckSigs, NoSubstitute); + } + + { + auto mc1 = std::make_shared>(nrStepsWaiting); + mc1.reset(); + MaintainCount mc2(nrStepsCopyingTo); + + printMsg(lvlDebug, "sending closure of ‘%s’ to ‘%s’", + localStore.printStorePath(step.drvPath), conn.machine->sshName); + + auto now1 = std::chrono::steady_clock::now(); + + /* Copy the input closure. */ + if (conn.machine->isLocalhost()) { + StorePathSet closure; + destStore.computeFSClosure(inputs, closure); + copyPaths(destStore, localStore, closure, NoRepair, NoCheckSigs, NoSubstitute); + } else { + copyClosureTo(conn.machine->state->sendLock, destStore, conn.from, conn.to, inputs, true); + } + + auto now2 = std::chrono::steady_clock::now(); + + overhead += std::chrono::duration_cast(now2 - now1).count(); + } + + return inputs; +} + void State::buildRemote(ref destStore, Machine::ptr machine, Step::ptr step, unsigned int maxSilentTime, unsigned int buildTimeout, unsigned int repeats, @@ -230,33 +311,21 @@ void State::buildRemote(ref destStore, process. Meh. */ }); - FdSource from(child.from.get()); - FdSink to(child.to.get()); + Machine::Connection conn; + conn.from = child.from.get(); + conn.to = child.to.get(); + conn.machine = machine; Finally updateStats([&]() { - bytesReceived += from.read; - bytesSent += to.written; + bytesReceived += conn.from.read; + bytesSent += conn.to.written; }); - /* Handshake. */ - unsigned int remoteVersion; - try { - to << SERVE_MAGIC_1 << 0x204; - to.flush(); - - unsigned int magic = readInt(from); - if (magic != SERVE_MAGIC_2) - throw Error("protocol mismatch with ‘nix-store --serve’ on ‘%1%’", machine->sshName); - remoteVersion = readInt(from); - if (GET_PROTOCOL_MAJOR(remoteVersion) != 0x200) - throw Error("unsupported ‘nix-store --serve’ protocol version on ‘%1%’", machine->sshName); - if (GET_PROTOCOL_MINOR(remoteVersion) < 3 && repeats > 0) - throw Error("machine ‘%1%’ does not support repeating a build; please upgrade it to Nix 1.12", machine->sshName); - + handshake(conn, repeats); } catch (EndOfFile & e) { child.pid.wait(); - string s = chomp(readFile(result.logFile)); + std::string s = chomp(readFile(result.logFile)); throw Error("cannot connect to ‘%1%’: %2%", machine->sshName, s); } @@ -272,61 +341,12 @@ void State::buildRemote(ref destStore, outputs of the input derivations. */ updateStep(ssSendingInputs); - StorePathSet inputs; - BasicDerivation basicDrv(*step->drv); - - for (auto & p : step->drv->inputSrcs) - inputs.insert(p); - - for (auto & input : step->drv->inputDrvs) { - auto drv2 = localStore->readDerivation(input.first); - for (auto & name : input.second) { - if (auto i = get(drv2.outputs, name)) { - auto outPath = i->path(*localStore, drv2.name, name); - inputs.insert(*outPath); - basicDrv.inputSrcs.insert(*outPath); - } - } - } - - /* Ensure that the inputs exist in the destination store. This is - a no-op for regular stores, but for the binary cache store, - this will copy the inputs to the binary cache from the local - store. */ - if (localStore != std::shared_ptr(destStore)) { - StorePathSet closure; - localStore->computeFSClosure(step->drv->inputSrcs, closure); - copyPaths(*localStore, *destStore, closure, NoRepair, NoCheckSigs, NoSubstitute); - } - - { - auto mc1 = std::make_shared>(nrStepsWaiting); - mc1.reset(); - MaintainCount mc2(nrStepsCopyingTo); - - printMsg(lvlDebug, "sending closure of ‘%s’ to ‘%s’", - localStore->printStorePath(step->drvPath), machine->sshName); - - auto now1 = std::chrono::steady_clock::now(); - - /* Copy the input closure. */ - if (machine->isLocalhost()) { - StorePathSet closure; - destStore->computeFSClosure(inputs, closure); - copyPaths(*destStore, *localStore, closure, NoRepair, NoCheckSigs, NoSubstitute); - } else { - copyClosureTo(machine->state->sendLock, *destStore, from, to, inputs, true); - } - - auto now2 = std::chrono::steady_clock::now(); - - result.overhead += std::chrono::duration_cast(now2 - now1).count(); - } + StorePathSet inputs = sendInputs(*this, *step, *localStore, *destStore, conn, result.overhead, nrStepsWaiting, nrStepsCopyingTo); logFileDel.cancel(); /* Truncate the log to get rid of messages about substitutions - etc. on the remote system. */ + etc. on the remote system. */ if (lseek(logFD.get(), SEEK_SET, 0) != 0) throw SysError("seeking to the start of log file ‘%s’", result.logFile); @@ -342,31 +362,31 @@ void State::buildRemote(ref destStore, updateStep(ssBuilding); - to << cmdBuildDerivation << localStore->printStorePath(step->drvPath); - writeDerivation(to, *localStore, basicDrv); - to << maxSilentTime << buildTimeout; - if (GET_PROTOCOL_MINOR(remoteVersion) >= 2) - to << maxLogSize; - if (GET_PROTOCOL_MINOR(remoteVersion) >= 3) { - to << repeats // == build-repeat + conn.to << cmdBuildDerivation << localStore->printStorePath(step->drvPath); + writeDerivation(conn.to, *localStore, BasicDerivation(*step->drv)); + conn.to << maxSilentTime << buildTimeout; + if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 2) + conn.to << maxLogSize; + if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 3) { + conn.to << repeats // == build-repeat << step->isDeterministic; // == enforce-determinism } - to.flush(); + conn.to.flush(); result.startTime = time(0); int res; { MaintainCount mc(nrStepsBuilding); - res = readInt(from); + res = readInt(conn.from); } result.stopTime = time(0); - result.errorMsg = readString(from); - if (GET_PROTOCOL_MINOR(remoteVersion) >= 3) { - result.timesBuilt = readInt(from); - result.isNonDeterministic = readInt(from); - auto start = readInt(from); - auto stop = readInt(from); + result.errorMsg = readString(conn.from); + if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 3) { + result.timesBuilt = readInt(conn.from); + result.isNonDeterministic = readInt(conn.from); + auto start = readInt(conn.from); + auto stop = readInt(conn.from); if (start && start) { /* Note: this represents the duration of a single round, rather than all rounds. */ @@ -374,8 +394,8 @@ void State::buildRemote(ref destStore, result.stopTime = stop; } } - if (GET_PROTOCOL_MINOR(remoteVersion) >= 6) { - worker_proto::read(*localStore, from, Phantom {}); + if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 6) { + worker_proto::read(*localStore, conn.from, Phantom {}); } switch ((BuildResult::Status) res) { case BuildResult::Built: @@ -451,19 +471,19 @@ void State::buildRemote(ref destStore, /* Get info about each output path. */ std::map infos; size_t totalNarSize = 0; - to << cmdQueryPathInfos; - worker_proto::write(*localStore, to, outputs); - to.flush(); + conn.to << cmdQueryPathInfos; + worker_proto::write(*localStore, conn.to, outputs); + conn.to.flush(); while (true) { - auto storePathS = readString(from); + auto storePathS = readString(conn.from); if (storePathS == "") break; - auto deriver = readString(from); // deriver - auto references = worker_proto::read(*localStore, from, Phantom {}); - readLongLong(from); // download size - auto narSize = readLongLong(from); - auto narHash = Hash::parseAny(readString(from), htSHA256); - auto ca = parseContentAddressOpt(readString(from)); - readStrings(from); // sigs + auto deriver = readString(conn.from); // deriver + auto references = worker_proto::read(*localStore, conn.from, Phantom {}); + readLongLong(conn.from); // download size + auto narSize = readLongLong(conn.from); + auto narHash = Hash::parseAny(readString(conn.from), htSHA256); + auto ca = parseContentAddressOpt(readString(conn.from)); + readStrings(conn.from); // sigs ValidPathInfo info(localStore->parseStorePath(storePathS), narHash); assert(outputs.count(info.path)); info.references = references; @@ -502,10 +522,10 @@ void State::buildRemote(ref destStore, lambda function only gets executed if someone tries to read from source2, we will send the command from here rather than outside the lambda. */ - to << cmdDumpStorePath << localStore->printStorePath(path); - to.flush(); + conn.to << cmdDumpStorePath << localStore->printStorePath(path); + conn.to.flush(); - TeeSource tee(from, sink); + TeeSource tee(conn.from, sink); extractNarData(tee, localStore->printStorePath(path), narMembers); });