Merge pull request #1313 from obsidiansystems/split-buildRemote

Split the `buildRemote` function, take 2
This commit is contained in:
John Ericson 2023-12-04 11:37:36 -05:00 committed by GitHub
commit a5d44b60ea
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 380 additions and 250 deletions

View file

@ -32,6 +32,8 @@ static void append(Strings & dst, const Strings & src)
dst.insert(dst.end(), src.begin(), src.end()); dst.insert(dst.end(), src.begin(), src.end());
} }
namespace nix::build_remote {
static Strings extraStoreArgs(std::string & machine) static Strings extraStoreArgs(std::string & machine)
{ {
Strings result; Strings result;
@ -107,33 +109,27 @@ static void openConnection(Machine::ptr machine, Path tmpDir, int stderrFD, Chil
} }
static void copyClosureTo(std::timed_mutex & sendMutex, Store & destStore, static void copyClosureTo(
FdSource & from, FdSink & to, ServeProto::Version remoteVersion, const StorePathSet & paths, Machine::Connection & conn,
Store & destStore,
const StorePathSet & paths,
bool useSubstitutes = false) bool useSubstitutes = false)
{ {
StorePathSet closure; StorePathSet closure;
destStore.computeFSClosure(paths, closure); destStore.computeFSClosure(paths, closure);
ServeProto::WriteConn wconn {
.to = to,
.version = remoteVersion,
};
ServeProto::ReadConn rconn {
.from = from,
.version = remoteVersion,
};
/* Send the "query valid paths" command with the "lock" option /* Send the "query valid paths" command with the "lock" option
enabled. This prevents a race where the remote host enabled. This prevents a race where the remote host
garbage-collect paths that are already there. Optionally, ask garbage-collect paths that are already there. Optionally, ask
the remote host to substitute missing paths. */ the remote host to substitute missing paths. */
// FIXME: substitute output pollutes our build log // FIXME: substitute output pollutes our build log
to << ServeProto::Command::QueryValidPaths << 1 << useSubstitutes; conn.to << ServeProto::Command::QueryValidPaths << 1 << useSubstitutes;
ServeProto::write(destStore, wconn, closure); ServeProto::write(destStore, conn, closure);
to.flush(); conn.to.flush();
/* Get back the set of paths that are already valid on the remote /* Get back the set of paths that are already valid on the remote
host. */ host. */
auto present = ServeProto::Serialise<StorePathSet>::read(destStore, rconn); auto present = ServeProto::Serialise<StorePathSet>::read(destStore, conn);
if (present.size() == closure.size()) return; if (present.size() == closure.size()) return;
@ -145,20 +141,20 @@ static void copyClosureTo(std::timed_mutex & sendMutex, Store & destStore,
printMsg(lvlDebug, "sending %d missing paths", missing.size()); printMsg(lvlDebug, "sending %d missing paths", missing.size());
std::unique_lock<std::timed_mutex> sendLock(sendMutex, std::unique_lock<std::timed_mutex> sendLock(conn.machine->state->sendLock,
std::chrono::seconds(600)); std::chrono::seconds(600));
to << ServeProto::Command::ImportPaths; conn.to << ServeProto::Command::ImportPaths;
destStore.exportPaths(missing, to); destStore.exportPaths(missing, conn.to);
to.flush(); conn.to.flush();
if (readInt(from) != 1) if (readInt(conn.from) != 1)
throw Error("remote machine failed to import closure"); throw Error("remote machine failed to import closure");
} }
// FIXME: use Store::topoSortPaths(). // FIXME: use Store::topoSortPaths().
StorePaths reverseTopoSortPaths(const std::map<StorePath, ValidPathInfo> & paths) static StorePaths reverseTopoSortPaths(const std::map<StorePath, ValidPathInfo> & paths)
{ {
StorePaths sorted; StorePaths sorted;
StorePathSet visited; StorePathSet visited;
@ -186,24 +182,311 @@ StorePaths reverseTopoSortPaths(const std::map<StorePath, ValidPathInfo> & paths
return sorted; return sorted;
} }
static std::pair<Path, AutoCloseFD> openLogFile(const std::string & logDir, const StorePath & drvPath)
{
std::string base(drvPath.to_string());
auto logFile = logDir + "/" + std::string(base, 0, 2) + "/" + std::string(base, 2);
createDirs(dirOf(logFile));
AutoCloseFD logFD = open(logFile.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0666);
if (!logFD) throw SysError("creating log file %s", logFile);
return {std::move(logFile), std::move(logFD)};
}
/**
* @param conn is not fully initialized; it is this functions job to set
* the `remoteVersion` field after the handshake is completed.
* Therefore, no `ServeProto::Serialize` functions can be used until
* that field is set.
*/
static void handshake(Machine::Connection & conn, unsigned int repeats)
{
conn.to << SERVE_MAGIC_1 << 0x206;
conn.to.flush();
unsigned int magic = readInt(conn.from);
if (magic != SERVE_MAGIC_2)
throw Error("protocol mismatch with nix-store --serve on %1%", conn.machine->sshName);
conn.remoteVersion = readInt(conn.from);
// Now `conn` is initialized.
if (GET_PROTOCOL_MAJOR(conn.remoteVersion) != 0x200)
throw Error("unsupported nix-store --serve protocol version on %1%", conn.machine->sshName);
if (GET_PROTOCOL_MINOR(conn.remoteVersion) < 3 && repeats > 0)
throw Error("machine %1% does not support repeating a build; please upgrade it to Nix 1.12", conn.machine->sshName);
}
static BasicDerivation sendInputs(
State & state,
Step & step,
Store & localStore,
Store & destStore,
Machine::Connection & conn,
unsigned int & overhead,
counter & nrStepsWaiting,
counter & nrStepsCopyingTo
)
{
BasicDerivation basicDrv(*step.drv);
for (const auto & [drvPath, node] : step.drv->inputDrvs.map) {
auto drv2 = localStore.readDerivation(drvPath);
for (auto & name : node.value) {
if (auto i = get(drv2.outputs, name)) {
auto outPath = i->path(localStore, drv2.name, name);
basicDrv.inputSrcs.insert(*outPath);
}
}
}
/* Ensure that the inputs exist in the destination store. This is
a no-op for regular stores, but for the binary cache store,
this will copy the inputs to the binary cache from the local
store. */
if (&localStore != &destStore) {
copyClosure(localStore, destStore,
step.drv->inputSrcs,
NoRepair, NoCheckSigs, NoSubstitute);
}
{
auto mc1 = std::make_shared<MaintainCount<counter>>(nrStepsWaiting);
mc1.reset();
MaintainCount<counter> mc2(nrStepsCopyingTo);
printMsg(lvlDebug, "sending closure of %s to %s",
localStore.printStorePath(step.drvPath), conn.machine->sshName);
auto now1 = std::chrono::steady_clock::now();
/* Copy the input closure. */
if (conn.machine->isLocalhost()) {
StorePathSet closure;
destStore.computeFSClosure(basicDrv.inputSrcs, closure);
copyPaths(destStore, localStore, closure, NoRepair, NoCheckSigs, NoSubstitute);
} else {
copyClosureTo(conn, destStore, basicDrv.inputSrcs, true);
}
auto now2 = std::chrono::steady_clock::now();
overhead += std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
}
return basicDrv;
}
static BuildResult performBuild(
Machine::Connection & conn,
Store & localStore,
StorePath drvPath,
const BasicDerivation & drv,
const State::BuildOptions & options,
counter & nrStepsBuilding
)
{
BuildResult result;
conn.to << ServeProto::Command::BuildDerivation << localStore.printStorePath(drvPath);
writeDerivation(conn.to, localStore, drv);
conn.to << options.maxSilentTime << options.buildTimeout;
if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 2)
conn.to << options.maxLogSize;
if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 3) {
conn.to
<< options.repeats // == build-repeat
<< options.enforceDeterminism;
}
conn.to.flush();
result.startTime = time(0);
{
MaintainCount<counter> mc(nrStepsBuilding);
result.status = (BuildResult::Status)readInt(conn.from);
}
result.stopTime = time(0);
result.errorMsg = readString(conn.from);
if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 3) {
result.timesBuilt = readInt(conn.from);
result.isNonDeterministic = readInt(conn.from);
auto start = readInt(conn.from);
auto stop = readInt(conn.from);
if (start && start) {
/* Note: this represents the duration of a single
round, rather than all rounds. */
result.startTime = start;
result.stopTime = stop;
}
}
if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 6) {
ServeProto::Serialise<DrvOutputs>::read(localStore, conn);
}
return result;
}
static std::map<StorePath, ValidPathInfo> queryPathInfos(
Machine::Connection & conn,
Store & localStore,
StorePathSet & outputs,
size_t & totalNarSize
)
{
/* Get info about each output path. */
std::map<StorePath, ValidPathInfo> infos;
conn.to << ServeProto::Command::QueryPathInfos;
ServeProto::write(localStore, conn, outputs);
conn.to.flush();
while (true) {
auto storePathS = readString(conn.from);
if (storePathS == "") break;
auto deriver = readString(conn.from); // deriver
auto references = ServeProto::Serialise<StorePathSet>::read(localStore, conn);
readLongLong(conn.from); // download size
auto narSize = readLongLong(conn.from);
auto narHash = Hash::parseAny(readString(conn.from), htSHA256);
auto ca = ContentAddress::parseOpt(readString(conn.from));
readStrings<StringSet>(conn.from); // sigs
ValidPathInfo info(localStore.parseStorePath(storePathS), narHash);
assert(outputs.count(info.path));
info.references = references;
info.narSize = narSize;
totalNarSize += info.narSize;
info.narHash = narHash;
info.ca = ca;
if (deriver != "")
info.deriver = localStore.parseStorePath(deriver);
infos.insert_or_assign(info.path, info);
}
return infos;
}
static void copyPathFromRemote(
Machine::Connection & conn,
NarMemberDatas & narMembers,
Store & localStore,
Store & destStore,
const ValidPathInfo & info
)
{
/* Receive the NAR from the remote and add it to the
destination store. Meanwhile, extract all the info from the
NAR that getBuildOutput() needs. */
auto source2 = sinkToSource([&](Sink & sink)
{
/* Note: we should only send the command to dump the store
path to the remote if the NAR is actually going to get read
by the destination store, which won't happen if this path
is already valid on the destination store. Since this
lambda function only gets executed if someone tries to read
from source2, we will send the command from here rather
than outside the lambda. */
conn.to << ServeProto::Command::DumpStorePath << localStore.printStorePath(info.path);
conn.to.flush();
TeeSource tee(conn.from, sink);
extractNarData(tee, localStore.printStorePath(info.path), narMembers);
});
destStore.addToStore(info, *source2, NoRepair, NoCheckSigs);
}
static void copyPathsFromRemote(
Machine::Connection & conn,
NarMemberDatas & narMembers,
Store & localStore,
Store & destStore,
const std::map<StorePath, ValidPathInfo> & infos
)
{
auto pathsSorted = reverseTopoSortPaths(infos);
for (auto & path : pathsSorted) {
auto & info = infos.find(path)->second;
copyPathFromRemote(conn, narMembers, localStore, destStore, info);
}
}
}
/* using namespace nix::build_remote; */
void RemoteResult::updateWithBuildResult(const nix::BuildResult & buildResult)
{
startTime = buildResult.startTime;
stopTime = buildResult.stopTime;
timesBuilt = buildResult.timesBuilt;
errorMsg = buildResult.errorMsg;
isNonDeterministic = buildResult.isNonDeterministic;
switch ((BuildResult::Status) buildResult.status) {
case BuildResult::Built:
stepStatus = bsSuccess;
break;
case BuildResult::Substituted:
case BuildResult::AlreadyValid:
stepStatus = bsSuccess;
isCached = true;
break;
case BuildResult::PermanentFailure:
stepStatus = bsFailed;
canCache = true;
errorMsg = "";
break;
case BuildResult::InputRejected:
case BuildResult::OutputRejected:
stepStatus = bsFailed;
canCache = true;
break;
case BuildResult::TransientFailure:
stepStatus = bsFailed;
canRetry = true;
errorMsg = "";
break;
case BuildResult::TimedOut:
stepStatus = bsTimedOut;
errorMsg = "";
break;
case BuildResult::MiscFailure:
stepStatus = bsAborted;
canRetry = true;
break;
case BuildResult::LogLimitExceeded:
stepStatus = bsLogLimitExceeded;
break;
case BuildResult::NotDeterministic:
stepStatus = bsNotDeterministic;
canRetry = false;
canCache = true;
break;
default:
stepStatus = bsAborted;
break;
}
}
void State::buildRemote(ref<Store> destStore, void State::buildRemote(ref<Store> destStore,
Machine::ptr machine, Step::ptr step, Machine::ptr machine, Step::ptr step,
unsigned int maxSilentTime, unsigned int buildTimeout, unsigned int repeats, const BuildOptions & buildOptions,
RemoteResult & result, std::shared_ptr<ActiveStep> activeStep, RemoteResult & result, std::shared_ptr<ActiveStep> activeStep,
std::function<void(StepState)> updateStep, std::function<void(StepState)> updateStep,
NarMemberDatas & narMembers) NarMemberDatas & narMembers)
{ {
assert(BuildResult::TimedOut == 8); assert(BuildResult::TimedOut == 8);
std::string base(step->drvPath.to_string()); auto [logFile, logFD] = build_remote::openLogFile(logDir, step->drvPath);
result.logFile = logDir + "/" + std::string(base, 0, 2) + "/" + std::string(base, 2); AutoDelete logFileDel(logFile, false);
AutoDelete autoDelete(result.logFile, false); result.logFile = logFile;
createDirs(dirOf(result.logFile));
AutoCloseFD logFD = open(result.logFile.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0666);
if (!logFD) throw SysError("creating log file %s", result.logFile);
nix::Path tmpDir = createTempDir(); nix::Path tmpDir = createTempDir();
AutoDelete tmpDirDel(tmpDir, true); AutoDelete tmpDirDel(tmpDir, true);
@ -214,7 +497,7 @@ void State::buildRemote(ref<Store> destStore,
// FIXME: rewrite to use Store. // FIXME: rewrite to use Store.
Child child; Child child;
openConnection(machine, tmpDir, logFD.get(), child); build_remote::openConnection(machine, tmpDir, logFD.get(), child);
{ {
auto activeStepState(activeStep->state_.lock()); auto activeStepState(activeStep->state_.lock());
@ -234,45 +517,25 @@ void State::buildRemote(ref<Store> destStore,
process. Meh. */ process. Meh. */
}); });
FdSource from(child.from.get()); Machine::Connection conn {
FdSink to(child.to.get()); .from = child.from.get(),
.to = child.to.get(),
.machine = machine,
};
Finally updateStats([&]() { Finally updateStats([&]() {
bytesReceived += from.read; bytesReceived += conn.from.read;
bytesSent += to.written; bytesSent += conn.to.written;
}); });
/* Handshake. */
ServeProto::Version remoteVersion;
try { try {
to << SERVE_MAGIC_1 << 0x206; build_remote::handshake(conn, buildOptions.repeats);
to.flush();
unsigned int magic = readInt(from);
if (magic != SERVE_MAGIC_2)
throw Error("protocol mismatch with nix-store --serve on %1%", machine->sshName);
remoteVersion = readInt(from);
if (GET_PROTOCOL_MAJOR(remoteVersion) != 0x200)
throw Error("unsupported nix-store --serve protocol version on %1%", machine->sshName);
if (GET_PROTOCOL_MINOR(remoteVersion) < 3 && repeats > 0)
throw Error("machine %1% does not support repeating a build; please upgrade it to Nix 1.12", machine->sshName);
} catch (EndOfFile & e) { } catch (EndOfFile & e) {
child.pid.wait(); child.pid.wait();
std::string s = chomp(readFile(result.logFile)); std::string s = chomp(readFile(result.logFile));
throw Error("cannot connect to %1%: %2%", machine->sshName, s); throw Error("cannot connect to %1%: %2%", machine->sshName, s);
} }
ServeProto::ReadConn rconn {
.from = from,
.version = remoteVersion,
};
ServeProto::WriteConn wconn {
.to = to,
.version = remoteVersion,
};
{ {
auto info(machine->state->connectInfo.lock()); auto info(machine->state->connectInfo.lock());
info->consecutiveFailures = 0; info->consecutiveFailures = 0;
@ -284,59 +547,9 @@ void State::buildRemote(ref<Store> destStore,
copy the immediate sources of the derivation and the required copy the immediate sources of the derivation and the required
outputs of the input derivations. */ outputs of the input derivations. */
updateStep(ssSendingInputs); updateStep(ssSendingInputs);
BasicDerivation resolvedDrv = build_remote::sendInputs(*this, *step, *localStore, *destStore, conn, result.overhead, nrStepsWaiting, nrStepsCopyingTo);
StorePathSet inputs; logFileDel.cancel();
BasicDerivation basicDrv(*step->drv);
for (auto & p : step->drv->inputSrcs)
inputs.insert(p);
for (auto & [drvPath, node] : step->drv->inputDrvs.map) {
auto drv2 = localStore->readDerivation(drvPath);
for (auto & name : node.value) {
if (auto i = get(drv2.outputs, name)) {
auto outPath = i->path(*localStore, drv2.name, name);
inputs.insert(*outPath);
basicDrv.inputSrcs.insert(*outPath);
}
}
}
/* Ensure that the inputs exist in the destination store. This is
a no-op for regular stores, but for the binary cache store,
this will copy the inputs to the binary cache from the local
store. */
if (localStore != std::shared_ptr<Store>(destStore)) {
copyClosure(*localStore, *destStore,
step->drv->inputSrcs,
NoRepair, NoCheckSigs, NoSubstitute);
}
{
auto mc1 = std::make_shared<MaintainCount<counter>>(nrStepsWaiting);
mc1.reset();
MaintainCount<counter> mc2(nrStepsCopyingTo);
printMsg(lvlDebug, "sending closure of %s to %s",
localStore->printStorePath(step->drvPath), machine->sshName);
auto now1 = std::chrono::steady_clock::now();
/* Copy the input closure. */
if (machine->isLocalhost()) {
StorePathSet closure;
destStore->computeFSClosure(inputs, closure);
copyPaths(*destStore, *localStore, closure, NoRepair, NoCheckSigs, NoSubstitute);
} else {
copyClosureTo(machine->state->sendLock, *destStore, from, to, remoteVersion, inputs, true);
}
auto now2 = std::chrono::steady_clock::now();
result.overhead += std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
}
autoDelete.cancel();
/* Truncate the log to get rid of messages about substitutions /* Truncate the log to get rid of messages about substitutions
etc. on the remote system. */ etc. on the remote system. */
@ -355,85 +568,17 @@ void State::buildRemote(ref<Store> destStore,
updateStep(ssBuilding); updateStep(ssBuilding);
to << ServeProto::Command::BuildDerivation << localStore->printStorePath(step->drvPath); BuildResult buildResult = build_remote::performBuild(
writeDerivation(to, *localStore, basicDrv); conn,
to << maxSilentTime << buildTimeout; *localStore,
if (GET_PROTOCOL_MINOR(remoteVersion) >= 2) step->drvPath,
to << maxLogSize; resolvedDrv,
if (GET_PROTOCOL_MINOR(remoteVersion) >= 3) { buildOptions,
to << repeats // == build-repeat nrStepsBuilding
<< step->isDeterministic; // == enforce-determinism );
}
to.flush();
result.startTime = time(0); result.updateWithBuildResult(buildResult);
int res;
{
MaintainCount<counter> mc(nrStepsBuilding);
res = readInt(from);
}
result.stopTime = time(0);
result.errorMsg = readString(from);
if (GET_PROTOCOL_MINOR(remoteVersion) >= 3) {
result.timesBuilt = readInt(from);
result.isNonDeterministic = readInt(from);
auto start = readInt(from);
auto stop = readInt(from);
if (start && start) {
/* Note: this represents the duration of a single
round, rather than all rounds. */
result.startTime = start;
result.stopTime = stop;
}
}
if (GET_PROTOCOL_MINOR(remoteVersion) >= 6) {
ServeProto::Serialise<DrvOutputs>::read(*localStore, rconn);
}
switch ((BuildResult::Status) res) {
case BuildResult::Built:
result.stepStatus = bsSuccess;
break;
case BuildResult::Substituted:
case BuildResult::AlreadyValid:
result.stepStatus = bsSuccess;
result.isCached = true;
break;
case BuildResult::PermanentFailure:
result.stepStatus = bsFailed;
result.canCache = true;
result.errorMsg = "";
break;
case BuildResult::InputRejected:
case BuildResult::OutputRejected:
result.stepStatus = bsFailed;
result.canCache = true;
break;
case BuildResult::TransientFailure:
result.stepStatus = bsFailed;
result.canRetry = true;
result.errorMsg = "";
break;
case BuildResult::TimedOut:
result.stepStatus = bsTimedOut;
result.errorMsg = "";
break;
case BuildResult::MiscFailure:
result.stepStatus = bsAborted;
result.canRetry = true;
break;
case BuildResult::LogLimitExceeded:
result.stepStatus = bsLogLimitExceeded;
break;
case BuildResult::NotDeterministic:
result.stepStatus = bsNotDeterministic;
result.canRetry = false;
result.canCache = true;
break;
default:
result.stepStatus = bsAborted;
break;
}
if (result.stepStatus != bsSuccess) return; if (result.stepStatus != bsSuccess) return;
result.errorMsg = ""; result.errorMsg = "";
@ -461,33 +606,8 @@ void State::buildRemote(ref<Store> destStore,
outputs.insert(*i.second.second); outputs.insert(*i.second.second);
} }
/* Get info about each output path. */
std::map<StorePath, ValidPathInfo> infos;
size_t totalNarSize = 0; size_t totalNarSize = 0;
to << ServeProto::Command::QueryPathInfos; auto infos = build_remote::queryPathInfos(conn, *localStore, outputs, totalNarSize);
ServeProto::write(*localStore, wconn, outputs);
to.flush();
while (true) {
auto storePathS = readString(from);
if (storePathS == "") break;
auto deriver = readString(from); // deriver
auto references = ServeProto::Serialise<StorePathSet>::read(*localStore, rconn);
readLongLong(from); // download size
auto narSize = readLongLong(from);
auto narHash = Hash::parseAny(readString(from), htSHA256);
auto ca = ContentAddress::parseOpt(readString(from));
readStrings<StringSet>(from); // sigs
ValidPathInfo info(localStore->parseStorePath(storePathS), narHash);
assert(outputs.count(info.path));
info.references = references;
info.narSize = narSize;
totalNarSize += info.narSize;
info.narHash = narHash;
info.ca = ca;
if (deriver != "")
info.deriver = localStore->parseStorePath(deriver);
infos.insert_or_assign(info.path, info);
}
if (totalNarSize > maxOutputSize) { if (totalNarSize > maxOutputSize) {
result.stepStatus = bsNarSizeLimitExceeded; result.stepStatus = bsNarSizeLimitExceeded;
@ -498,33 +618,7 @@ void State::buildRemote(ref<Store> destStore,
printMsg(lvlDebug, "copying outputs of %s from %s (%d bytes)", printMsg(lvlDebug, "copying outputs of %s from %s (%d bytes)",
localStore->printStorePath(step->drvPath), machine->sshName, totalNarSize); localStore->printStorePath(step->drvPath), machine->sshName, totalNarSize);
auto pathsSorted = reverseTopoSortPaths(infos); build_remote::copyPathsFromRemote(conn, narMembers, *localStore, *destStore, infos);
for (auto & path : pathsSorted) {
auto & info = infos.find(path)->second;
/* Receive the NAR from the remote and add it to the
destination store. Meanwhile, extract all the info from the
NAR that getBuildOutput() needs. */
auto source2 = sinkToSource([&](Sink & sink)
{
/* Note: we should only send the command to dump the store
path to the remote if the NAR is actually going to get read
by the destination store, which won't happen if this path
is already valid on the destination store. Since this
lambda function only gets executed if someone tries to read
from source2, we will send the command from here rather
than outside the lambda. */
to << ServeProto::Command::DumpStorePath << localStore->printStorePath(path);
to.flush();
TeeSource tee(from, sink);
extractNarData(tee, localStore->printStorePath(path), narMembers);
});
destStore->addToStore(info, *source2, NoRepair, NoCheckSigs);
}
auto now2 = std::chrono::steady_clock::now(); auto now2 = std::chrono::steady_clock::now();
result.overhead += std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count(); result.overhead += std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();

View file

@ -98,8 +98,10 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
it). */ it). */
BuildID buildId; BuildID buildId;
std::optional<StorePath> buildDrvPath; std::optional<StorePath> buildDrvPath;
unsigned int maxSilentTime, buildTimeout; BuildOptions buildOptions;
unsigned int repeats = step->isDeterministic ? 1 : 0; buildOptions.repeats = step->isDeterministic ? 1 : 0;
buildOptions.maxLogSize = maxLogSize;
buildOptions.enforceDeterminism = step->isDeterministic;
auto conn(dbPool.get()); auto conn(dbPool.get());
@ -134,18 +136,18 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
{ {
auto i = jobsetRepeats.find(std::make_pair(build2->projectName, build2->jobsetName)); auto i = jobsetRepeats.find(std::make_pair(build2->projectName, build2->jobsetName));
if (i != jobsetRepeats.end()) if (i != jobsetRepeats.end())
repeats = std::max(repeats, i->second); buildOptions.repeats = std::max(buildOptions.repeats, i->second);
} }
} }
if (!build) build = *dependents.begin(); if (!build) build = *dependents.begin();
buildId = build->id; buildId = build->id;
buildDrvPath = build->drvPath; buildDrvPath = build->drvPath;
maxSilentTime = build->maxSilentTime; buildOptions.maxSilentTime = build->maxSilentTime;
buildTimeout = build->buildTimeout; buildOptions.buildTimeout = build->buildTimeout;
printInfo("performing step %s %d times on %s (needed by build %d and %d others)", printInfo("performing step %s %d times on %s (needed by build %d and %d others)",
localStore->printStorePath(step->drvPath), repeats + 1, machine->sshName, buildId, (dependents.size() - 1)); localStore->printStorePath(step->drvPath), buildOptions.repeats + 1, machine->sshName, buildId, (dependents.size() - 1));
} }
if (!buildOneDone) if (!buildOneDone)
@ -206,7 +208,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
try { try {
/* FIXME: referring builds may have conflicting timeouts. */ /* FIXME: referring builds may have conflicting timeouts. */
buildRemote(destStore, machine, step, maxSilentTime, buildTimeout, repeats, result, activeStep, updateStep, narMembers); buildRemote(destStore, machine, step, buildOptions, result, activeStep, updateStep, narMembers);
} catch (Error & e) { } catch (Error & e) {
if (activeStep->state_.lock()->cancelled) { if (activeStep->state_.lock()->cancelled) {
printInfo("marking step %d of build %d as cancelled", stepNr, buildId); printInfo("marking step %d of build %d as cancelled", stepNr, buildId);

View file

@ -21,6 +21,7 @@
#include "store-api.hh" #include "store-api.hh"
#include "sync.hh" #include "sync.hh"
#include "nar-extractor.hh" #include "nar-extractor.hh"
#include "serve-protocol.hh"
typedef unsigned int BuildID; typedef unsigned int BuildID;
@ -78,6 +79,8 @@ struct RemoteResult
{ {
return stepStatus == bsCachedFailure ? bsFailed : stepStatus; return stepStatus == bsCachedFailure ? bsFailed : stepStatus;
} }
void updateWithBuildResult(const nix::BuildResult &);
}; };
@ -297,6 +300,32 @@ struct Machine
std::regex r("^(ssh://|ssh-ng://)?localhost$"); std::regex r("^(ssh://|ssh-ng://)?localhost$");
return std::regex_search(sshName, r); return std::regex_search(sshName, r);
} }
// A connection to a machine
struct Connection {
nix::FdSource from;
nix::FdSink to;
nix::ServeProto::Version remoteVersion;
// Backpointer to the machine
ptr machine;
operator nix::ServeProto::ReadConn ()
{
return {
.from = from,
.version = remoteVersion,
};
}
operator nix::ServeProto::WriteConn ()
{
return {
.to = to,
.version = remoteVersion,
};
}
};
}; };
@ -459,6 +488,12 @@ private:
public: public:
State(std::optional<std::string> metricsAddrOpt); State(std::optional<std::string> metricsAddrOpt);
struct BuildOptions {
unsigned int maxSilentTime, buildTimeout, repeats;
size_t maxLogSize;
bool enforceDeterminism;
};
private: private:
nix::MaintainCount<counter> startDbUpdate(); nix::MaintainCount<counter> startDbUpdate();
@ -543,8 +578,7 @@ private:
void buildRemote(nix::ref<nix::Store> destStore, void buildRemote(nix::ref<nix::Store> destStore,
Machine::ptr machine, Step::ptr step, Machine::ptr machine, Step::ptr step,
unsigned int maxSilentTime, unsigned int buildTimeout, const BuildOptions & buildOptions,
unsigned int repeats,
RemoteResult & result, std::shared_ptr<ActiveStep> activeStep, RemoteResult & result, std::shared_ptr<ActiveStep> activeStep,
std::function<void(StepState)> updateStep, std::function<void(StepState)> updateStep,
NarMemberDatas & narMembers); NarMemberDatas & narMembers);