diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc index f3576a0c..c0aa0712 100644 --- a/src/hydra-queue-runner/build-remote.cc +++ b/src/hydra-queue-runner/build-remote.cc @@ -130,264 +130,266 @@ void State::buildRemote(ref destStore, nix::Path tmpDir = createTempDir(); AutoDelete tmpDirDel(tmpDir, true); - Child child; - openConnection(machine, tmpDir, logFD, child); - - logFD.close(); - - FdSource from(child.from); - FdSink to(child.to); - - Finally updateStats([&]() { - bytesReceived += from.read; - bytesSent += to.written; - }); - - /* Handshake. */ - bool sendDerivation = true; - unsigned int remoteVersion; - try { - to << SERVE_MAGIC_1 << 0x202; - to.flush(); - unsigned int magic = readInt(from); - if (magic != SERVE_MAGIC_2) - throw Error(format("protocol mismatch with ‘nix-store --serve’ on ‘%1%’") % machine->sshName); - remoteVersion = readInt(from); - if (GET_PROTOCOL_MAJOR(remoteVersion) != 0x200) - throw Error(format("unsupported ‘nix-store --serve’ protocol version on ‘%1%’") % machine->sshName); - if (GET_PROTOCOL_MINOR(remoteVersion) >= 1) - sendDerivation = false; + Child child; + openConnection(machine, tmpDir, logFD, child); - } catch (EndOfFile & e) { - child.pid.wait(true); + logFD.close(); + + FdSource from(child.from); + FdSink to(child.to); + + Finally updateStats([&]() { + bytesReceived += from.read; + bytesSent += to.written; + }); + + /* Handshake. */ + bool sendDerivation = true; + unsigned int remoteVersion; + + try { + to << SERVE_MAGIC_1 << 0x202; + to.flush(); + + unsigned int magic = readInt(from); + if (magic != SERVE_MAGIC_2) + throw Error(format("protocol mismatch with ‘nix-store --serve’ on ‘%1%’") % machine->sshName); + remoteVersion = readInt(from); + if (GET_PROTOCOL_MAJOR(remoteVersion) != 0x200) + throw Error(format("unsupported ‘nix-store --serve’ protocol version on ‘%1%’") % machine->sshName); + if (GET_PROTOCOL_MINOR(remoteVersion) >= 1) + sendDerivation = false; + + } catch (EndOfFile & e) { + child.pid.wait(true); + string s = chomp(readFile(result.logFile)); + throw Error(format("cannot connect to ‘%1%’: %2%") % machine->sshName % s); + } { - /* Disable this machine until a certain period of time has - passed. This period increases on every consecutive - failure. However, don't count failures that occurred - soon after the last one (to take into account steps - started in parallel). */ auto info(machine->state->connectInfo.lock()); - auto now = std::chrono::system_clock::now(); - if (info->consecutiveFailures == 0 || info->lastFailure < now - std::chrono::seconds(30)) { - info->consecutiveFailures = std::min(info->consecutiveFailures + 1, (unsigned int) 4); - info->lastFailure = now; - int delta = retryInterval * powf(retryBackoff, info->consecutiveFailures - 1) + (rand() % 30); - printMsg(lvlInfo, format("will disable machine ‘%1%’ for %2%s") % machine->sshName % delta); - info->disabledUntil = now + std::chrono::seconds(delta); + info->consecutiveFailures = 0; + } + + /* Gather the inputs. If the remote side is Nix <= 1.9, we have to + copy the entire closure of ‘drvPath’, as well as the required + outputs of the input derivations. On Nix > 1.9, we only need to + copy the immediate sources of the derivation and the required + outputs of the input derivations. */ + PathSet inputs; + BasicDerivation basicDrv(step->drv); + + if (sendDerivation) + inputs.insert(step->drvPath); + else + for (auto & p : step->drv.inputSrcs) + inputs.insert(p); + + for (auto & input : step->drv.inputDrvs) { + Derivation drv2 = readDerivation(input.first); + for (auto & name : input.second) { + auto i = drv2.outputs.find(name); + if (i == drv2.outputs.end()) continue; + inputs.insert(i->second.path); + basicDrv.inputSrcs.insert(i->second.path); } } - string s = chomp(readFile(result.logFile)); - throw Error(format("cannot connect to ‘%1%’: %2%") % machine->sshName % s); - } + /* Ensure that the inputs exist in the destination store. This is + a no-op for regular stores, but for the binary cache store, + this will copy the inputs to the binary cache from the local + store. */ + destStore->buildPaths(basicDrv.inputSrcs); - { + /* Copy the input closure. */ + if (/* machine->sshName != "localhost" */ true) { + auto mc1 = std::make_shared(nrStepsWaiting); + std::lock_guard sendLock(machine->state->sendLock); + mc1.reset(); + MaintainCount mc2(nrStepsCopyingTo); + printMsg(lvlDebug, format("sending closure of ‘%1%’ to ‘%2%’") % step->drvPath % machine->sshName); + + auto now1 = std::chrono::steady_clock::now(); + + copyClosureTo(destStore, from, to, inputs, true); + + auto now2 = std::chrono::steady_clock::now(); + + result.overhead += std::chrono::duration_cast(now2 - now1).count(); + } + + autoDelete.cancel(); + + /* Do the build. */ + printMsg(lvlDebug, format("building ‘%1%’ on ‘%2%’") % step->drvPath % machine->sshName); + + if (sendDerivation) + to << cmdBuildPaths << PathSet({step->drvPath}); + else + to << cmdBuildDerivation << step->drvPath << basicDrv; + to << maxSilentTime << buildTimeout; + if (GET_PROTOCOL_MINOR(remoteVersion) >= 2) + to << 64 * 1024 * 1024; // == maxLogSize + to.flush(); + + result.startTime = time(0); + int res; + { + MaintainCount mc(nrStepsBuilding); + res = readInt(from); + } + result.stopTime = time(0); + + if (sendDerivation) { + if (res) { + result.errorMsg = (format("%1% on ‘%2%’") % readString(from) % machine->sshName).str(); + if (res == 100) { + result.stepStatus = bsFailed; + result.canCache = true; + } + else if (res == 101) { + result.stepStatus = bsTimedOut; + } + else { + result.stepStatus = bsAborted; + result.canRetry = true; + } + return; + } + result.stepStatus = bsSuccess; + } else { + result.errorMsg = readString(from); + switch ((BuildResult::Status) res) { + case BuildResult::Built: + result.stepStatus = bsSuccess; + break; + case BuildResult::Substituted: + case BuildResult::AlreadyValid: + result.stepStatus = bsSuccess; + result.isCached = true; + break; + case BuildResult::PermanentFailure: + result.stepStatus = bsFailed; + result.canCache = true; + result.errorMsg = ""; + break; + case BuildResult::InputRejected: + case BuildResult::OutputRejected: + result.stepStatus = bsFailed; + result.canCache = true; + break; + case BuildResult::TransientFailure: + result.stepStatus = bsFailed; + result.canRetry = true; + result.errorMsg = ""; + break; + case BuildResult::CachedFailure: // cached on the build machine + result.stepStatus = bsCachedFailure; + result.canCache = true; + result.errorMsg = ""; + break; + case BuildResult::TimedOut: + result.stepStatus = bsTimedOut; + result.errorMsg = ""; + break; + case BuildResult::MiscFailure: + result.stepStatus = bsAborted; + result.canRetry = true; + break; + case BuildResult::LogLimitExceeded: + result.stepStatus = bsLogLimitExceeded; + break; + default: + result.stepStatus = bsAborted; + break; + } + if (result.stepStatus != bsSuccess) return; + } + + result.errorMsg = ""; + + /* If the path was substituted or already valid, then we didn't + get a build log. */ + if (result.isCached) { + printMsg(lvlInfo, format("outputs of ‘%1%’ substituted or already valid on ‘%2%’") % step->drvPath % machine->sshName); + unlink(result.logFile.c_str()); + result.logFile = ""; + } + + /* Copy the output paths. */ + if (/* machine->sshName != "localhost" */ true) { + MaintainCount mc(nrStepsCopyingFrom); + + auto now1 = std::chrono::steady_clock::now(); + + PathSet outputs; + for (auto & output : step->drv.outputs) + outputs.insert(output.second.path); + + /* Query the size of the output paths. */ + size_t totalNarSize = 0; + to << cmdQueryPathInfos << outputs; + to.flush(); + while (true) { + if (readString(from) == "") break; + readString(from); // deriver + readStrings(from); // references + readLongLong(from); // download size + totalNarSize += readLongLong(from); + } + + if (totalNarSize > maxOutputSize) { + result.stepStatus = bsNarSizeLimitExceeded; + return; + } + + printMsg(lvlDebug, format("copying outputs of ‘%s’ from ‘%s’ (%d bytes)") + % step->drvPath % machine->sshName % totalNarSize); + + /* Block until we have the required amount of memory + available. FIXME: only need this for binary cache + destination stores. */ + auto resStart = std::chrono::steady_clock::now(); + auto memoryReservation(memoryTokens.get(totalNarSize)); + auto resStop = std::chrono::steady_clock::now(); + + auto resMs = std::chrono::duration_cast(resStop - resStart).count(); + if (resMs >= 1000) + printMsg(lvlError, format("warning: had to wait %d ms for %d memory tokens for %s") + % resMs % totalNarSize % step->drvPath); + + result.accessor = destStore->getFSAccessor(); + + to << cmdExportPaths << 0 << outputs; + to.flush(); + destStore->importPaths(false, from, result.accessor); + + auto now2 = std::chrono::steady_clock::now(); + + result.overhead += std::chrono::duration_cast(now2 - now1).count(); + } + + /* Shut down the connection. */ + child.to.close(); + child.pid.wait(true); + + } catch (Error & e) { + /* Disable this machine until a certain period of time has + passed. This period increases on every consecutive + failure. However, don't count failures that occurred soon + after the last one (to take into account steps started in + parallel). */ auto info(machine->state->connectInfo.lock()); - info->consecutiveFailures = 0; - } - - /* Gather the inputs. If the remote side is Nix <= 1.9, we have to - copy the entire closure of ‘drvPath’, as well as the required - outputs of the input derivations. On Nix > 1.9, we only need to - copy the immediate sources of the derivation and the required - outputs of the input derivations. */ - PathSet inputs; - BasicDerivation basicDrv(step->drv); - - if (sendDerivation) - inputs.insert(step->drvPath); - else - for (auto & p : step->drv.inputSrcs) - inputs.insert(p); - - for (auto & input : step->drv.inputDrvs) { - Derivation drv2 = readDerivation(input.first); - for (auto & name : input.second) { - auto i = drv2.outputs.find(name); - if (i == drv2.outputs.end()) continue; - inputs.insert(i->second.path); - basicDrv.inputSrcs.insert(i->second.path); + auto now = std::chrono::system_clock::now(); + if (info->consecutiveFailures == 0 || info->lastFailure < now - std::chrono::seconds(30)) { + info->consecutiveFailures = std::min(info->consecutiveFailures + 1, (unsigned int) 4); + info->lastFailure = now; + int delta = retryInterval * powf(retryBackoff, info->consecutiveFailures - 1) + (rand() % 30); + printMsg(lvlInfo, format("will disable machine ‘%1%’ for %2%s") % machine->sshName % delta); + info->disabledUntil = now + std::chrono::seconds(delta); } + throw; } - - /* Ensure that the inputs exist in the destination store. This is - a no-op for regular stores, but for the binary cache store, - this will copy the inputs to the binary cache from the local - store. */ - destStore->buildPaths(basicDrv.inputSrcs); - - /* Copy the input closure. */ - if (/* machine->sshName != "localhost" */ true) { - auto mc1 = std::make_shared(nrStepsWaiting); - std::lock_guard sendLock(machine->state->sendLock); - mc1.reset(); - MaintainCount mc2(nrStepsCopyingTo); - printMsg(lvlDebug, format("sending closure of ‘%1%’ to ‘%2%’") % step->drvPath % machine->sshName); - - auto now1 = std::chrono::steady_clock::now(); - - copyClosureTo(destStore, from, to, inputs, true); - - auto now2 = std::chrono::steady_clock::now(); - - result.overhead += std::chrono::duration_cast(now2 - now1).count(); - } - - autoDelete.cancel(); - - /* Do the build. */ - printMsg(lvlDebug, format("building ‘%1%’ on ‘%2%’") % step->drvPath % machine->sshName); - - if (sendDerivation) - to << cmdBuildPaths << PathSet({step->drvPath}); - else - to << cmdBuildDerivation << step->drvPath << basicDrv; - to << maxSilentTime << buildTimeout; - if (GET_PROTOCOL_MINOR(remoteVersion) >= 2) - to << 64 * 1024 * 1024; // == maxLogSize - to.flush(); - - result.startTime = time(0); - int res; - { - MaintainCount mc(nrStepsBuilding); - res = readInt(from); - } - result.stopTime = time(0); - - if (sendDerivation) { - if (res) { - result.errorMsg = (format("%1% on ‘%2%’") % readString(from) % machine->sshName).str(); - if (res == 100) { - result.stepStatus = bsFailed; - result.canCache = true; - } - else if (res == 101) { - result.stepStatus = bsTimedOut; - } - else { - result.stepStatus = bsAborted; - result.canRetry = true; - } - return; - } - result.stepStatus = bsSuccess; - } else { - result.errorMsg = readString(from); - switch ((BuildResult::Status) res) { - case BuildResult::Built: - result.stepStatus = bsSuccess; - break; - case BuildResult::Substituted: - case BuildResult::AlreadyValid: - result.stepStatus = bsSuccess; - result.isCached = true; - break; - case BuildResult::PermanentFailure: - result.stepStatus = bsFailed; - result.canCache = true; - result.errorMsg = ""; - break; - case BuildResult::InputRejected: - case BuildResult::OutputRejected: - result.stepStatus = bsFailed; - result.canCache = true; - break; - case BuildResult::TransientFailure: - result.stepStatus = bsFailed; - result.canRetry = true; - result.errorMsg = ""; - break; - case BuildResult::CachedFailure: // cached on the build machine - result.stepStatus = bsCachedFailure; - result.canCache = true; - result.errorMsg = ""; - break; - case BuildResult::TimedOut: - result.stepStatus = bsTimedOut; - result.errorMsg = ""; - break; - case BuildResult::MiscFailure: - result.stepStatus = bsAborted; - result.canRetry = true; - break; - case BuildResult::LogLimitExceeded: - result.stepStatus = bsLogLimitExceeded; - break; - default: - result.stepStatus = bsAborted; - break; - } - if (result.stepStatus != bsSuccess) return; - } - - result.errorMsg = ""; - - /* If the path was substituted or already valid, then we didn't - get a build log. */ - if (result.isCached) { - printMsg(lvlInfo, format("outputs of ‘%1%’ substituted or already valid on ‘%2%’") % step->drvPath % machine->sshName); - unlink(result.logFile.c_str()); - result.logFile = ""; - } - - /* Copy the output paths. */ - if (/* machine->sshName != "localhost" */ true) { - MaintainCount mc(nrStepsCopyingFrom); - - auto now1 = std::chrono::steady_clock::now(); - - PathSet outputs; - for (auto & output : step->drv.outputs) - outputs.insert(output.second.path); - - /* Query the size of the output paths. */ - size_t totalNarSize = 0; - to << cmdQueryPathInfos << outputs; - to.flush(); - while (true) { - if (readString(from) == "") break; - readString(from); // deriver - readStrings(from); // references - readLongLong(from); // download size - totalNarSize += readLongLong(from); - } - - if (totalNarSize > maxOutputSize) { - result.stepStatus = bsNarSizeLimitExceeded; - return; - } - - printMsg(lvlDebug, format("copying outputs of ‘%s’ from ‘%s’ (%d bytes)") - % step->drvPath % machine->sshName % totalNarSize); - - /* Block until we have the required amount of memory - available. FIXME: only need this for binary cache - destination stores. */ - auto resStart = std::chrono::steady_clock::now(); - auto memoryReservation(memoryTokens.get(totalNarSize)); - auto resStop = std::chrono::steady_clock::now(); - - auto resMs = std::chrono::duration_cast(resStop - resStart).count(); - if (resMs >= 1000) - printMsg(lvlError, format("warning: had to wait %d ms for %d memory tokens for %s") - % resMs % totalNarSize % step->drvPath); - - result.accessor = destStore->getFSAccessor(); - - to << cmdExportPaths << 0 << outputs; - to.flush(); - destStore->importPaths(false, from, result.accessor); - - auto now2 = std::chrono::steady_clock::now(); - - result.overhead += std::chrono::duration_cast(now2 - now1).count(); - } - - /* Shut down the connection. */ - child.to.close(); - child.pid.wait(true); }