commit
c1bd50a80d
34
flake.lock
34
flake.lock
|
@ -16,58 +16,58 @@
|
||||||
"type": "github"
|
"type": "github"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"lowdown-src": {
|
"libgit2": {
|
||||||
"flake": false,
|
"flake": false,
|
||||||
"locked": {
|
"locked": {
|
||||||
"lastModified": 1633514407,
|
"lastModified": 1697646580,
|
||||||
"narHash": "sha256-Dw32tiMjdK9t3ETl5fzGrutQTzh2rufgZV4A/BbxuD4=",
|
"narHash": "sha256-oX4Z3S9WtJlwvj0uH9HlYcWv+x1hqp8mhXl7HsLu2f0=",
|
||||||
"owner": "kristapsdz",
|
"owner": "libgit2",
|
||||||
"repo": "lowdown",
|
"repo": "libgit2",
|
||||||
"rev": "d2c2b44ff6c27b936ec27358a2653caaef8f73b8",
|
"rev": "45fd9ed7ae1a9b74b957ef4f337bc3c8b3df01b5",
|
||||||
"type": "github"
|
"type": "github"
|
||||||
},
|
},
|
||||||
"original": {
|
"original": {
|
||||||
"owner": "kristapsdz",
|
"owner": "libgit2",
|
||||||
"repo": "lowdown",
|
"repo": "libgit2",
|
||||||
"type": "github"
|
"type": "github"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"nix": {
|
"nix": {
|
||||||
"inputs": {
|
"inputs": {
|
||||||
"flake-compat": "flake-compat",
|
"flake-compat": "flake-compat",
|
||||||
"lowdown-src": "lowdown-src",
|
"libgit2": "libgit2",
|
||||||
"nixpkgs": [
|
"nixpkgs": [
|
||||||
"nixpkgs"
|
"nixpkgs"
|
||||||
],
|
],
|
||||||
"nixpkgs-regression": "nixpkgs-regression"
|
"nixpkgs-regression": "nixpkgs-regression"
|
||||||
},
|
},
|
||||||
"locked": {
|
"locked": {
|
||||||
"lastModified": 1706208340,
|
"lastModified": 1706637536,
|
||||||
"narHash": "sha256-wNyHUEIiKKVs6UXrUzhP7RSJQv0A8jckgcuylzftl8k=",
|
"narHash": "sha256-fjx+nCOzuSxGWfhwWWc8hCsLFZAjZLDDUcbBtldRqbk=",
|
||||||
"owner": "NixOS",
|
"owner": "NixOS",
|
||||||
"repo": "nix",
|
"repo": "nix",
|
||||||
"rev": "2c4bb93ba5a97e7078896ebc36385ce172960e4e",
|
"rev": "8f42912c80c0a03f62f6a3d28a3af05a9762565d",
|
||||||
"type": "github"
|
"type": "github"
|
||||||
},
|
},
|
||||||
"original": {
|
"original": {
|
||||||
"owner": "NixOS",
|
"owner": "NixOS",
|
||||||
"ref": "2.19-maintenance",
|
"ref": "2.20-maintenance",
|
||||||
"repo": "nix",
|
"repo": "nix",
|
||||||
"type": "github"
|
"type": "github"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"nixpkgs": {
|
"nixpkgs": {
|
||||||
"locked": {
|
"locked": {
|
||||||
"lastModified": 1701615100,
|
"lastModified": 1705033721,
|
||||||
"narHash": "sha256-7VI84NGBvlCTduw2aHLVB62NvCiZUlALLqBe5v684Aw=",
|
"narHash": "sha256-K5eJHmL1/kev6WuqyqqbS1cdNnSidIZ3jeqJ7GbrYnQ=",
|
||||||
"owner": "NixOS",
|
"owner": "NixOS",
|
||||||
"repo": "nixpkgs",
|
"repo": "nixpkgs",
|
||||||
"rev": "e9f06adb793d1cca5384907b3b8a4071d5d7cb19",
|
"rev": "a1982c92d8980a0114372973cbdfe0a307f1bdea",
|
||||||
"type": "github"
|
"type": "github"
|
||||||
},
|
},
|
||||||
"original": {
|
"original": {
|
||||||
"owner": "NixOS",
|
"owner": "NixOS",
|
||||||
"ref": "nixos-23.05",
|
"ref": "nixos-23.05-small",
|
||||||
"repo": "nixpkgs",
|
"repo": "nixpkgs",
|
||||||
"type": "github"
|
"type": "github"
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,8 +1,8 @@
|
||||||
{
|
{
|
||||||
description = "A Nix-based continuous build system";
|
description = "A Nix-based continuous build system";
|
||||||
|
|
||||||
inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixos-23.05";
|
inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixos-23.05-small";
|
||||||
inputs.nix.url = "github:NixOS/nix/2.19-maintenance";
|
inputs.nix.url = "github:NixOS/nix/2.20-maintenance";
|
||||||
inputs.nix.inputs.nixpkgs.follows = "nixpkgs";
|
inputs.nix.inputs.nixpkgs.follows = "nixpkgs";
|
||||||
|
|
||||||
# TODO get rid of this once https://github.com/NixOS/nix/pull/9546 is
|
# TODO get rid of this once https://github.com/NixOS/nix/pull/9546 is
|
||||||
|
|
|
@ -89,7 +89,7 @@ struct MyArgs : MixEvalArgs, MixCommonArgs, RootArgs
|
||||||
|
|
||||||
static MyArgs myArgs;
|
static MyArgs myArgs;
|
||||||
|
|
||||||
static std::string queryMetaStrings(EvalState & state, DrvInfo & drv, const std::string & name, const std::string & subAttribute)
|
static std::string queryMetaStrings(EvalState & state, PackageInfo & drv, const std::string & name, const std::string & subAttribute)
|
||||||
{
|
{
|
||||||
Strings res;
|
Strings res;
|
||||||
std::function<void(Value & v)> rec;
|
std::function<void(Value & v)> rec;
|
||||||
|
@ -181,7 +181,7 @@ static void worker(
|
||||||
// CA derivations do not have static output paths, so we
|
// CA derivations do not have static output paths, so we
|
||||||
// have to defensively not query output paths in case we
|
// have to defensively not query output paths in case we
|
||||||
// encounter one.
|
// encounter one.
|
||||||
DrvInfo::Outputs outputs = drv->queryOutputs(
|
PackageInfo::Outputs outputs = drv->queryOutputs(
|
||||||
!experimentalFeatureSettings.isEnabled(Xp::CaDerivations));
|
!experimentalFeatureSettings.isEnabled(Xp::CaDerivations));
|
||||||
|
|
||||||
if (drv->querySystem() == "unknown")
|
if (drv->querySystem() == "unknown")
|
||||||
|
|
|
@ -8,6 +8,7 @@
|
||||||
#include "build-result.hh"
|
#include "build-result.hh"
|
||||||
#include "path.hh"
|
#include "path.hh"
|
||||||
#include "serve-protocol.hh"
|
#include "serve-protocol.hh"
|
||||||
|
#include "serve-protocol-impl.hh"
|
||||||
#include "state.hh"
|
#include "state.hh"
|
||||||
#include "current-process.hh"
|
#include "current-process.hh"
|
||||||
#include "processes.hh"
|
#include "processes.hh"
|
||||||
|
@ -20,12 +21,6 @@
|
||||||
|
|
||||||
using namespace nix;
|
using namespace nix;
|
||||||
|
|
||||||
|
|
||||||
static void append(Strings & dst, const Strings & src)
|
|
||||||
{
|
|
||||||
dst.insert(dst.end(), src.begin(), src.end());
|
|
||||||
}
|
|
||||||
|
|
||||||
namespace nix::build_remote {
|
namespace nix::build_remote {
|
||||||
|
|
||||||
static Strings extraStoreArgs(std::string & machine)
|
static Strings extraStoreArgs(std::string & machine)
|
||||||
|
@ -48,58 +43,20 @@ static Strings extraStoreArgs(std::string & machine)
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void openConnection(::Machine::ptr machine, Path tmpDir, int stderrFD, SSHMaster::Connection & child)
|
static std::unique_ptr<SSHMaster::Connection> openConnection(
|
||||||
|
::Machine::ptr machine, SSHMaster & master)
|
||||||
{
|
{
|
||||||
std::string pgmName;
|
Strings command = {"nix-store", "--serve", "--write"};
|
||||||
Pipe to, from;
|
|
||||||
to.create();
|
|
||||||
from.create();
|
|
||||||
|
|
||||||
Strings argv;
|
|
||||||
if (machine->isLocalhost()) {
|
if (machine->isLocalhost()) {
|
||||||
pgmName = "nix-store";
|
command.push_back("--builders");
|
||||||
argv = {"nix-store", "--builders", "", "--serve", "--write"};
|
command.push_back("");
|
||||||
} else {
|
} else {
|
||||||
pgmName = "ssh";
|
command.splice(command.end(), extraStoreArgs(machine->sshName));
|
||||||
auto sshName = machine->sshName;
|
|
||||||
Strings extraArgs = extraStoreArgs(sshName);
|
|
||||||
argv = {"ssh", sshName};
|
|
||||||
if (machine->sshKey != "") append(argv, {"-i", machine->sshKey});
|
|
||||||
if (machine->sshPublicHostKey != "") {
|
|
||||||
Path fileName = tmpDir + "/host-key";
|
|
||||||
auto p = machine->sshName.find("@");
|
|
||||||
std::string host = p != std::string::npos ? std::string(machine->sshName, p + 1) : machine->sshName;
|
|
||||||
writeFile(fileName, host + " " + machine->sshPublicHostKey + "\n");
|
|
||||||
append(argv, {"-oUserKnownHostsFile=" + fileName});
|
|
||||||
}
|
|
||||||
append(argv,
|
|
||||||
{ "-x", "-a", "-oBatchMode=yes", "-oConnectTimeout=60", "-oTCPKeepAlive=yes"
|
|
||||||
, "--", "nix-store", "--serve", "--write" });
|
|
||||||
append(argv, extraArgs);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
child.sshPid = startProcess([&]() {
|
return master.startCommand(std::move(command), {
|
||||||
restoreProcessContext();
|
"-a", "-oBatchMode=yes", "-oConnectTimeout=60", "-oTCPKeepAlive=yes"
|
||||||
|
|
||||||
if (dup2(to.readSide.get(), STDIN_FILENO) == -1)
|
|
||||||
throw SysError("cannot dup input pipe to stdin");
|
|
||||||
|
|
||||||
if (dup2(from.writeSide.get(), STDOUT_FILENO) == -1)
|
|
||||||
throw SysError("cannot dup output pipe to stdout");
|
|
||||||
|
|
||||||
if (dup2(stderrFD, STDERR_FILENO) == -1)
|
|
||||||
throw SysError("cannot dup stderr");
|
|
||||||
|
|
||||||
execvp(argv.front().c_str(), (char * *) stringsToCharPtrs(argv).data()); // FIXME: remove cast
|
|
||||||
|
|
||||||
throw SysError("cannot start %s", pgmName);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
to.readSide = -1;
|
|
||||||
from.writeSide = -1;
|
|
||||||
|
|
||||||
child.in = to.writeSide.release();
|
|
||||||
child.out = from.readSide.release();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -117,13 +74,10 @@ static void copyClosureTo(
|
||||||
garbage-collect paths that are already there. Optionally, ask
|
garbage-collect paths that are already there. Optionally, ask
|
||||||
the remote host to substitute missing paths. */
|
the remote host to substitute missing paths. */
|
||||||
// FIXME: substitute output pollutes our build log
|
// FIXME: substitute output pollutes our build log
|
||||||
conn.to << ServeProto::Command::QueryValidPaths << 1 << useSubstitutes;
|
|
||||||
ServeProto::write(destStore, conn, closure);
|
|
||||||
conn.to.flush();
|
|
||||||
|
|
||||||
/* Get back the set of paths that are already valid on the remote
|
/* Get back the set of paths that are already valid on the remote
|
||||||
host. */
|
host. */
|
||||||
auto present = ServeProto::Serialise<StorePathSet>::read(destStore, conn);
|
auto present = conn.queryValidPaths(
|
||||||
|
destStore, true, closure, useSubstitutes);
|
||||||
|
|
||||||
if (present.size() == closure.size()) return;
|
if (present.size() == closure.size()) return;
|
||||||
|
|
||||||
|
@ -148,7 +102,7 @@ static void copyClosureTo(
|
||||||
|
|
||||||
|
|
||||||
// FIXME: use Store::topoSortPaths().
|
// FIXME: use Store::topoSortPaths().
|
||||||
static StorePaths reverseTopoSortPaths(const std::map<StorePath, ValidPathInfo> & paths)
|
static StorePaths reverseTopoSortPaths(const std::map<StorePath, UnkeyedValidPathInfo> & paths)
|
||||||
{
|
{
|
||||||
StorePaths sorted;
|
StorePaths sorted;
|
||||||
StorePathSet visited;
|
StorePathSet visited;
|
||||||
|
@ -189,28 +143,6 @@ static std::pair<Path, AutoCloseFD> openLogFile(const std::string & logDir, cons
|
||||||
return {std::move(logFile), std::move(logFD)};
|
return {std::move(logFile), std::move(logFD)};
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @param conn is not fully initialized; it is this functions job to set
|
|
||||||
* the `remoteVersion` field after the handshake is completed.
|
|
||||||
* Therefore, no `ServeProto::Serialize` functions can be used until
|
|
||||||
* that field is set.
|
|
||||||
*/
|
|
||||||
static void handshake(::Machine::Connection & conn, unsigned int repeats)
|
|
||||||
{
|
|
||||||
conn.to << SERVE_MAGIC_1 << 0x206;
|
|
||||||
conn.to.flush();
|
|
||||||
|
|
||||||
unsigned int magic = readInt(conn.from);
|
|
||||||
if (magic != SERVE_MAGIC_2)
|
|
||||||
throw Error("protocol mismatch with ‘nix-store --serve’ on ‘%1%’", conn.machine->sshName);
|
|
||||||
conn.remoteVersion = readInt(conn.from);
|
|
||||||
// Now `conn` is initialized.
|
|
||||||
if (GET_PROTOCOL_MAJOR(conn.remoteVersion) != 0x200)
|
|
||||||
throw Error("unsupported ‘nix-store --serve’ protocol version on ‘%1%’", conn.machine->sshName);
|
|
||||||
if (GET_PROTOCOL_MINOR(conn.remoteVersion) < 3 && repeats > 0)
|
|
||||||
throw Error("machine ‘%1%’ does not support repeating a build; please upgrade it to Nix 1.12", conn.machine->sshName);
|
|
||||||
}
|
|
||||||
|
|
||||||
static BasicDerivation sendInputs(
|
static BasicDerivation sendInputs(
|
||||||
State & state,
|
State & state,
|
||||||
Step & step,
|
Step & step,
|
||||||
|
@ -281,21 +213,11 @@ static BuildResult performBuild(
|
||||||
Store & localStore,
|
Store & localStore,
|
||||||
StorePath drvPath,
|
StorePath drvPath,
|
||||||
const BasicDerivation & drv,
|
const BasicDerivation & drv,
|
||||||
const State::BuildOptions & options,
|
const ServeProto::BuildOptions & options,
|
||||||
counter & nrStepsBuilding
|
counter & nrStepsBuilding
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
conn.to << ServeProto::Command::BuildDerivation << localStore.printStorePath(drvPath);
|
conn.putBuildDerivationRequest(localStore, drvPath, drv, options);
|
||||||
writeDerivation(conn.to, localStore, drv);
|
|
||||||
conn.to << options.maxSilentTime << options.buildTimeout;
|
|
||||||
if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 2)
|
|
||||||
conn.to << options.maxLogSize;
|
|
||||||
if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 3) {
|
|
||||||
conn.to
|
|
||||||
<< options.repeats // == build-repeat
|
|
||||||
<< options.enforceDeterminism;
|
|
||||||
}
|
|
||||||
conn.to.flush();
|
|
||||||
|
|
||||||
BuildResult result;
|
BuildResult result;
|
||||||
|
|
||||||
|
@ -345,7 +267,7 @@ static BuildResult performBuild(
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
static std::map<StorePath, ValidPathInfo> queryPathInfos(
|
static std::map<StorePath, UnkeyedValidPathInfo> queryPathInfos(
|
||||||
::Machine::Connection & conn,
|
::Machine::Connection & conn,
|
||||||
Store & localStore,
|
Store & localStore,
|
||||||
StorePathSet & outputs,
|
StorePathSet & outputs,
|
||||||
|
@ -354,30 +276,18 @@ static std::map<StorePath, ValidPathInfo> queryPathInfos(
|
||||||
{
|
{
|
||||||
|
|
||||||
/* Get info about each output path. */
|
/* Get info about each output path. */
|
||||||
std::map<StorePath, ValidPathInfo> infos;
|
std::map<StorePath, UnkeyedValidPathInfo> infos;
|
||||||
conn.to << ServeProto::Command::QueryPathInfos;
|
conn.to << ServeProto::Command::QueryPathInfos;
|
||||||
ServeProto::write(localStore, conn, outputs);
|
ServeProto::write(localStore, conn, outputs);
|
||||||
conn.to.flush();
|
conn.to.flush();
|
||||||
while (true) {
|
while (true) {
|
||||||
auto storePathS = readString(conn.from);
|
auto storePathS = readString(conn.from);
|
||||||
if (storePathS == "") break;
|
if (storePathS == "") break;
|
||||||
auto deriver = readString(conn.from); // deriver
|
|
||||||
auto references = ServeProto::Serialise<StorePathSet>::read(localStore, conn);
|
auto storePath = localStore.parseStorePath(storePathS);
|
||||||
readLongLong(conn.from); // download size
|
auto info = ServeProto::Serialise<UnkeyedValidPathInfo>::read(localStore, conn);
|
||||||
auto narSize = readLongLong(conn.from);
|
|
||||||
auto narHash = Hash::parseAny(readString(conn.from), htSHA256);
|
|
||||||
auto ca = ContentAddress::parseOpt(readString(conn.from));
|
|
||||||
readStrings<StringSet>(conn.from); // sigs
|
|
||||||
ValidPathInfo info(localStore.parseStorePath(storePathS), narHash);
|
|
||||||
assert(outputs.count(info.path));
|
|
||||||
info.references = references;
|
|
||||||
info.narSize = narSize;
|
|
||||||
totalNarSize += info.narSize;
|
totalNarSize += info.narSize;
|
||||||
info.narHash = narHash;
|
infos.insert_or_assign(std::move(storePath), std::move(info));
|
||||||
info.ca = ca;
|
|
||||||
if (deriver != "")
|
|
||||||
info.deriver = localStore.parseStorePath(deriver);
|
|
||||||
infos.insert_or_assign(info.path, info);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return infos;
|
return infos;
|
||||||
|
@ -418,14 +328,16 @@ static void copyPathsFromRemote(
|
||||||
NarMemberDatas & narMembers,
|
NarMemberDatas & narMembers,
|
||||||
Store & localStore,
|
Store & localStore,
|
||||||
Store & destStore,
|
Store & destStore,
|
||||||
const std::map<StorePath, ValidPathInfo> & infos
|
const std::map<StorePath, UnkeyedValidPathInfo> & infos
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
auto pathsSorted = reverseTopoSortPaths(infos);
|
auto pathsSorted = reverseTopoSortPaths(infos);
|
||||||
|
|
||||||
for (auto & path : pathsSorted) {
|
for (auto & path : pathsSorted) {
|
||||||
auto & info = infos.find(path)->second;
|
auto & info = infos.find(path)->second;
|
||||||
copyPathFromRemote(conn, narMembers, localStore, destStore, info);
|
copyPathFromRemote(
|
||||||
|
conn, narMembers, localStore, destStore,
|
||||||
|
ValidPathInfo { path, info });
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -492,7 +404,7 @@ void RemoteResult::updateWithBuildResult(const nix::BuildResult & buildResult)
|
||||||
|
|
||||||
void State::buildRemote(ref<Store> destStore,
|
void State::buildRemote(ref<Store> destStore,
|
||||||
::Machine::ptr machine, Step::ptr step,
|
::Machine::ptr machine, Step::ptr step,
|
||||||
const BuildOptions & buildOptions,
|
const ServeProto::BuildOptions & buildOptions,
|
||||||
RemoteResult & result, std::shared_ptr<ActiveStep> activeStep,
|
RemoteResult & result, std::shared_ptr<ActiveStep> activeStep,
|
||||||
std::function<void(StepState)> updateStep,
|
std::function<void(StepState)> updateStep,
|
||||||
NarMemberDatas & narMembers)
|
NarMemberDatas & narMembers)
|
||||||
|
@ -503,21 +415,26 @@ void State::buildRemote(ref<Store> destStore,
|
||||||
AutoDelete logFileDel(logFile, false);
|
AutoDelete logFileDel(logFile, false);
|
||||||
result.logFile = logFile;
|
result.logFile = logFile;
|
||||||
|
|
||||||
nix::Path tmpDir = createTempDir();
|
|
||||||
AutoDelete tmpDirDel(tmpDir, true);
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
||||||
updateStep(ssConnecting);
|
updateStep(ssConnecting);
|
||||||
|
|
||||||
|
SSHMaster master {
|
||||||
|
machine->sshName,
|
||||||
|
machine->sshKey,
|
||||||
|
machine->sshPublicHostKey,
|
||||||
|
false, // no SSH master yet
|
||||||
|
false, // no compression yet
|
||||||
|
logFD.get(),
|
||||||
|
};
|
||||||
|
|
||||||
// FIXME: rewrite to use Store.
|
// FIXME: rewrite to use Store.
|
||||||
SSHMaster::Connection child;
|
auto child = build_remote::openConnection(machine, master);
|
||||||
build_remote::openConnection(machine, tmpDir, logFD.get(), child);
|
|
||||||
|
|
||||||
{
|
{
|
||||||
auto activeStepState(activeStep->state_.lock());
|
auto activeStepState(activeStep->state_.lock());
|
||||||
if (activeStepState->cancelled) throw Error("step cancelled");
|
if (activeStepState->cancelled) throw Error("step cancelled");
|
||||||
activeStepState->pid = child.sshPid;
|
activeStepState->pid = child->sshPid;
|
||||||
}
|
}
|
||||||
|
|
||||||
Finally clearPid([&]() {
|
Finally clearPid([&]() {
|
||||||
|
@ -533,9 +450,13 @@ void State::buildRemote(ref<Store> destStore,
|
||||||
});
|
});
|
||||||
|
|
||||||
::Machine::Connection conn {
|
::Machine::Connection conn {
|
||||||
.from = child.out.get(),
|
{
|
||||||
.to = child.in.get(),
|
.to = child->in.get(),
|
||||||
.machine = machine,
|
.from = child->out.get(),
|
||||||
|
/* Handshake. */
|
||||||
|
.remoteVersion = 0xdadbeef, // FIXME avoid dummy initialize
|
||||||
|
},
|
||||||
|
/*.machine =*/ machine,
|
||||||
};
|
};
|
||||||
|
|
||||||
Finally updateStats([&]() {
|
Finally updateStats([&]() {
|
||||||
|
@ -543,14 +464,26 @@ void State::buildRemote(ref<Store> destStore,
|
||||||
bytesSent += conn.to.written;
|
bytesSent += conn.to.written;
|
||||||
});
|
});
|
||||||
|
|
||||||
|
constexpr ServeProto::Version our_version = 0x206;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
build_remote::handshake(conn, buildOptions.repeats);
|
conn.remoteVersion = decltype(conn)::handshake(
|
||||||
|
conn.to,
|
||||||
|
conn.from,
|
||||||
|
our_version,
|
||||||
|
machine->sshName);
|
||||||
} catch (EndOfFile & e) {
|
} catch (EndOfFile & e) {
|
||||||
child.sshPid.wait();
|
child->sshPid.wait();
|
||||||
std::string s = chomp(readFile(result.logFile));
|
std::string s = chomp(readFile(result.logFile));
|
||||||
throw Error("cannot connect to ‘%1%’: %2%", machine->sshName, s);
|
throw Error("cannot connect to ‘%1%’: %2%", machine->sshName, s);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Do not attempt to speak a newer version of the protocol.
|
||||||
|
//
|
||||||
|
// Per https://github.com/NixOS/nix/issues/9584 should be handled as
|
||||||
|
// part of `handshake` in upstream nix.
|
||||||
|
conn.remoteVersion = std::min(conn.remoteVersion, our_version);
|
||||||
|
|
||||||
{
|
{
|
||||||
auto info(machine->state->connectInfo.lock());
|
auto info(machine->state->connectInfo.lock());
|
||||||
info->consecutiveFailures = 0;
|
info->consecutiveFailures = 0;
|
||||||
|
@ -653,8 +586,8 @@ void State::buildRemote(ref<Store> destStore,
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Shut down the connection. */
|
/* Shut down the connection. */
|
||||||
child.in = -1;
|
child->in = -1;
|
||||||
child.sshPid.wait();
|
child->sshPid.wait();
|
||||||
|
|
||||||
} catch (Error & e) {
|
} catch (Error & e) {
|
||||||
/* Disable this machine until a certain period of time has
|
/* Disable this machine until a certain period of time has
|
||||||
|
|
|
@ -98,10 +98,13 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
|
||||||
it). */
|
it). */
|
||||||
BuildID buildId;
|
BuildID buildId;
|
||||||
std::optional<StorePath> buildDrvPath;
|
std::optional<StorePath> buildDrvPath;
|
||||||
BuildOptions buildOptions;
|
// Other fields set below
|
||||||
buildOptions.repeats = step->isDeterministic ? 1 : 0;
|
nix::ServeProto::BuildOptions buildOptions {
|
||||||
buildOptions.maxLogSize = maxLogSize;
|
.maxLogSize = maxLogSize,
|
||||||
buildOptions.enforceDeterminism = step->isDeterministic;
|
.nrRepeats = step->isDeterministic ? 1u : 0u,
|
||||||
|
.enforceDeterminism = step->isDeterministic,
|
||||||
|
.keepFailed = false,
|
||||||
|
};
|
||||||
|
|
||||||
auto conn(dbPool.get());
|
auto conn(dbPool.get());
|
||||||
|
|
||||||
|
@ -136,7 +139,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
|
||||||
{
|
{
|
||||||
auto i = jobsetRepeats.find(std::make_pair(build2->projectName, build2->jobsetName));
|
auto i = jobsetRepeats.find(std::make_pair(build2->projectName, build2->jobsetName));
|
||||||
if (i != jobsetRepeats.end())
|
if (i != jobsetRepeats.end())
|
||||||
buildOptions.repeats = std::max(buildOptions.repeats, i->second);
|
buildOptions.nrRepeats = std::max(buildOptions.nrRepeats, i->second);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!build) build = *dependents.begin();
|
if (!build) build = *dependents.begin();
|
||||||
|
@ -147,7 +150,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
|
||||||
buildOptions.buildTimeout = build->buildTimeout;
|
buildOptions.buildTimeout = build->buildTimeout;
|
||||||
|
|
||||||
printInfo("performing step ‘%s’ %d times on ‘%s’ (needed by build %d and %d others)",
|
printInfo("performing step ‘%s’ %d times on ‘%s’ (needed by build %d and %d others)",
|
||||||
localStore->printStorePath(step->drvPath), buildOptions.repeats + 1, machine->sshName, buildId, (dependents.size() - 1));
|
localStore->printStorePath(step->drvPath), buildOptions.nrRepeats + 1, machine->sshName, buildId, (dependents.size() - 1));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!buildOneDone)
|
if (!buildOneDone)
|
||||||
|
|
|
@ -231,11 +231,11 @@ system_time State::doDispatch()
|
||||||
sort(machinesSorted.begin(), machinesSorted.end(),
|
sort(machinesSorted.begin(), machinesSorted.end(),
|
||||||
[](const MachineInfo & a, const MachineInfo & b) -> bool
|
[](const MachineInfo & a, const MachineInfo & b) -> bool
|
||||||
{
|
{
|
||||||
float ta = std::round(a.currentJobs / a.machine->speedFactorFloat);
|
float ta = std::round(a.currentJobs / a.machine->speedFactor);
|
||||||
float tb = std::round(b.currentJobs / b.machine->speedFactorFloat);
|
float tb = std::round(b.currentJobs / b.machine->speedFactor);
|
||||||
return
|
return
|
||||||
ta != tb ? ta < tb :
|
ta != tb ? ta < tb :
|
||||||
a.machine->speedFactorFloat != b.machine->speedFactorFloat ? a.machine->speedFactorFloat > b.machine->speedFactorFloat :
|
a.machine->speedFactor != b.machine->speedFactor ? a.machine->speedFactor > b.machine->speedFactor :
|
||||||
a.currentJobs > b.currentJobs;
|
a.currentJobs > b.currentJobs;
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
@ -155,16 +155,16 @@ void State::parseMachines(const std::string & contents)
|
||||||
auto machine = std::make_shared<::Machine>(nix::Machine {
|
auto machine = std::make_shared<::Machine>(nix::Machine {
|
||||||
// `storeUri`, not yet used
|
// `storeUri`, not yet used
|
||||||
"",
|
"",
|
||||||
// `systemTypes`, not yet used
|
// `systemTypes`
|
||||||
{},
|
tokenizeString<StringSet>(tokens[1], ","),
|
||||||
// `sshKey`
|
// `sshKey`
|
||||||
tokens[2] == "-" ? "" : tokens[2],
|
tokens[2] == "-" ? "" : tokens[2],
|
||||||
// `maxJobs`
|
// `maxJobs`
|
||||||
tokens[3] != ""
|
tokens[3] != ""
|
||||||
? string2Int<MaxJobs>(tokens[3]).value()
|
? string2Int<MaxJobs>(tokens[3]).value()
|
||||||
: 1,
|
: 1,
|
||||||
// `speedFactor`, not yet used
|
// `speedFactor`
|
||||||
1,
|
atof(tokens[4].c_str()),
|
||||||
// `supportedFeatures`
|
// `supportedFeatures`
|
||||||
std::move(supportedFeatures),
|
std::move(supportedFeatures),
|
||||||
// `mandatoryFeatures`
|
// `mandatoryFeatures`
|
||||||
|
@ -176,8 +176,6 @@ void State::parseMachines(const std::string & contents)
|
||||||
});
|
});
|
||||||
|
|
||||||
machine->sshName = tokens[0];
|
machine->sshName = tokens[0];
|
||||||
machine->systemTypesSet = tokenizeString<StringSet>(tokens[1], ",");
|
|
||||||
machine->speedFactorFloat = atof(tokens[4].c_str());
|
|
||||||
|
|
||||||
/* Re-use the State object of the previous machine with the
|
/* Re-use the State object of the previous machine with the
|
||||||
same name. */
|
same name. */
|
||||||
|
@ -641,7 +639,7 @@ void State::dumpStatus(Connection & conn)
|
||||||
|
|
||||||
json machine = {
|
json machine = {
|
||||||
{"enabled", m->enabled},
|
{"enabled", m->enabled},
|
||||||
{"systemTypes", m->systemTypesSet},
|
{"systemTypes", m->systemTypes},
|
||||||
{"supportedFeatures", m->supportedFeatures},
|
{"supportedFeatures", m->supportedFeatures},
|
||||||
{"mandatoryFeatures", m->mandatoryFeatures},
|
{"mandatoryFeatures", m->mandatoryFeatures},
|
||||||
{"nrStepsDone", s->nrStepsDone.load()},
|
{"nrStepsDone", s->nrStepsDone.load()},
|
||||||
|
|
|
@ -6,7 +6,46 @@
|
||||||
|
|
||||||
using namespace nix;
|
using namespace nix;
|
||||||
|
|
||||||
struct Extractor : ParseSink
|
|
||||||
|
struct NarMemberConstructor : CreateRegularFileSink
|
||||||
|
{
|
||||||
|
NarMemberData & curMember;
|
||||||
|
|
||||||
|
HashSink hashSink = HashSink { HashAlgorithm::SHA256 };
|
||||||
|
|
||||||
|
std::optional<uint64_t> expectedSize;
|
||||||
|
|
||||||
|
NarMemberConstructor(NarMemberData & curMember)
|
||||||
|
: curMember(curMember)
|
||||||
|
{ }
|
||||||
|
|
||||||
|
void isExecutable() override
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
void preallocateContents(uint64_t size) override
|
||||||
|
{
|
||||||
|
expectedSize = size;
|
||||||
|
}
|
||||||
|
|
||||||
|
void operator () (std::string_view data) override
|
||||||
|
{
|
||||||
|
assert(expectedSize);
|
||||||
|
*curMember.fileSize += data.size();
|
||||||
|
hashSink(data);
|
||||||
|
if (curMember.contents) {
|
||||||
|
curMember.contents->append(data);
|
||||||
|
}
|
||||||
|
assert(curMember.fileSize <= expectedSize);
|
||||||
|
if (curMember.fileSize == expectedSize) {
|
||||||
|
auto [hash, len] = hashSink.finish();
|
||||||
|
assert(curMember.fileSize == len);
|
||||||
|
curMember.sha256 = hash;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
struct Extractor : FileSystemObjectSink
|
||||||
{
|
{
|
||||||
std::unordered_set<Path> filesToKeep {
|
std::unordered_set<Path> filesToKeep {
|
||||||
"/nix-support/hydra-build-products",
|
"/nix-support/hydra-build-products",
|
||||||
|
@ -15,7 +54,6 @@ struct Extractor : ParseSink
|
||||||
};
|
};
|
||||||
|
|
||||||
NarMemberDatas & members;
|
NarMemberDatas & members;
|
||||||
NarMemberData * curMember = nullptr;
|
|
||||||
Path prefix;
|
Path prefix;
|
||||||
|
|
||||||
Extractor(NarMemberDatas & members, const Path & prefix)
|
Extractor(NarMemberDatas & members, const Path & prefix)
|
||||||
|
@ -27,53 +65,22 @@ struct Extractor : ParseSink
|
||||||
members.insert_or_assign(prefix + path, NarMemberData { .type = SourceAccessor::Type::tDirectory });
|
members.insert_or_assign(prefix + path, NarMemberData { .type = SourceAccessor::Type::tDirectory });
|
||||||
}
|
}
|
||||||
|
|
||||||
void createRegularFile(const Path & path) override
|
void createRegularFile(const Path & path, std::function<void(CreateRegularFileSink &)> func) override
|
||||||
{
|
{
|
||||||
curMember = &members.insert_or_assign(prefix + path, NarMemberData {
|
NarMemberConstructor nmc {
|
||||||
.type = SourceAccessor::Type::tRegular,
|
members.insert_or_assign(prefix + path, NarMemberData {
|
||||||
.fileSize = 0,
|
.type = SourceAccessor::Type::tRegular,
|
||||||
.contents = filesToKeep.count(path) ? std::optional("") : std::nullopt,
|
.fileSize = 0,
|
||||||
}).first->second;
|
.contents = filesToKeep.count(path) ? std::optional("") : std::nullopt,
|
||||||
}
|
}).first->second,
|
||||||
|
};
|
||||||
std::optional<uint64_t> expectedSize;
|
func(nmc);
|
||||||
std::unique_ptr<HashSink> hashSink;
|
|
||||||
|
|
||||||
void preallocateContents(uint64_t size) override
|
|
||||||
{
|
|
||||||
expectedSize = size;
|
|
||||||
hashSink = std::make_unique<HashSink>(htSHA256);
|
|
||||||
}
|
|
||||||
|
|
||||||
void receiveContents(std::string_view data) override
|
|
||||||
{
|
|
||||||
assert(expectedSize);
|
|
||||||
assert(curMember);
|
|
||||||
assert(hashSink);
|
|
||||||
*curMember->fileSize += data.size();
|
|
||||||
(*hashSink)(data);
|
|
||||||
if (curMember->contents) {
|
|
||||||
curMember->contents->append(data);
|
|
||||||
}
|
|
||||||
assert(curMember->fileSize <= expectedSize);
|
|
||||||
if (curMember->fileSize == expectedSize) {
|
|
||||||
auto [hash, len] = hashSink->finish();
|
|
||||||
assert(curMember->fileSize == len);
|
|
||||||
curMember->sha256 = hash;
|
|
||||||
hashSink.reset();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void createSymlink(const Path & path, const std::string & target) override
|
void createSymlink(const Path & path, const std::string & target) override
|
||||||
{
|
{
|
||||||
members.insert_or_assign(prefix + path, NarMemberData { .type = SourceAccessor::Type::tSymlink });
|
members.insert_or_assign(prefix + path, NarMemberData { .type = SourceAccessor::Type::tSymlink });
|
||||||
}
|
}
|
||||||
|
|
||||||
void isExecutable() override
|
|
||||||
{ }
|
|
||||||
|
|
||||||
void closeRegularFile() override
|
|
||||||
{ }
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -696,7 +696,7 @@ BuildOutput State::getBuildOutputCached(Connection & conn, nix::ref<nix::Store>
|
||||||
product.fileSize = row[2].as<off_t>();
|
product.fileSize = row[2].as<off_t>();
|
||||||
}
|
}
|
||||||
if (!row[3].is_null())
|
if (!row[3].is_null())
|
||||||
product.sha256hash = Hash::parseAny(row[3].as<std::string>(), htSHA256);
|
product.sha256hash = Hash::parseAny(row[3].as<std::string>(), HashAlgorithm::SHA256);
|
||||||
if (!row[4].is_null())
|
if (!row[4].is_null())
|
||||||
product.path = row[4].as<std::string>();
|
product.path = row[4].as<std::string>();
|
||||||
product.name = row[5].as<std::string>();
|
product.name = row[5].as<std::string>();
|
||||||
|
|
|
@ -22,6 +22,7 @@
|
||||||
#include "sync.hh"
|
#include "sync.hh"
|
||||||
#include "nar-extractor.hh"
|
#include "nar-extractor.hh"
|
||||||
#include "serve-protocol.hh"
|
#include "serve-protocol.hh"
|
||||||
|
#include "serve-protocol-impl.hh"
|
||||||
#include "machines.hh"
|
#include "machines.hh"
|
||||||
|
|
||||||
|
|
||||||
|
@ -243,14 +244,6 @@ struct Machine : nix::Machine
|
||||||
we are not yet used to, but once we are, we don't need this. */
|
we are not yet used to, but once we are, we don't need this. */
|
||||||
std::string sshName;
|
std::string sshName;
|
||||||
|
|
||||||
/* TODO Get rid once `nix::Machine::systemTypes` is a set not
|
|
||||||
vector. */
|
|
||||||
std::set<std::string> systemTypesSet;
|
|
||||||
|
|
||||||
/* TODO Get rid once `nix::Machine::systemTypes` is a `float` not
|
|
||||||
an `int`. */
|
|
||||||
float speedFactorFloat = 1.0;
|
|
||||||
|
|
||||||
struct State {
|
struct State {
|
||||||
typedef std::shared_ptr<State> ptr;
|
typedef std::shared_ptr<State> ptr;
|
||||||
counter currentJobs{0};
|
counter currentJobs{0};
|
||||||
|
@ -277,7 +270,7 @@ struct Machine : nix::Machine
|
||||||
{
|
{
|
||||||
/* Check that this machine is of the type required by the
|
/* Check that this machine is of the type required by the
|
||||||
step. */
|
step. */
|
||||||
if (!systemTypesSet.count(step->drv->platform == "builtin" ? nix::settings.thisSystem : step->drv->platform))
|
if (!systemTypes.count(step->drv->platform == "builtin" ? nix::settings.thisSystem : step->drv->platform))
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
/* Check that the step requires all mandatory features of this
|
/* Check that the step requires all mandatory features of this
|
||||||
|
@ -307,29 +300,9 @@ struct Machine : nix::Machine
|
||||||
}
|
}
|
||||||
|
|
||||||
// A connection to a machine
|
// A connection to a machine
|
||||||
struct Connection {
|
struct Connection : nix::ServeProto::BasicClientConnection {
|
||||||
nix::FdSource from;
|
|
||||||
nix::FdSink to;
|
|
||||||
nix::ServeProto::Version remoteVersion;
|
|
||||||
|
|
||||||
// Backpointer to the machine
|
// Backpointer to the machine
|
||||||
ptr machine;
|
ptr machine;
|
||||||
|
|
||||||
operator nix::ServeProto::ReadConn ()
|
|
||||||
{
|
|
||||||
return {
|
|
||||||
.from = from,
|
|
||||||
.version = remoteVersion,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
operator nix::ServeProto::WriteConn ()
|
|
||||||
{
|
|
||||||
return {
|
|
||||||
.to = to,
|
|
||||||
.version = remoteVersion,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -464,7 +437,7 @@ private:
|
||||||
|
|
||||||
/* How often the build steps of a jobset should be repeated in
|
/* How often the build steps of a jobset should be repeated in
|
||||||
order to detect non-determinism. */
|
order to detect non-determinism. */
|
||||||
std::map<std::pair<std::string, std::string>, unsigned int> jobsetRepeats;
|
std::map<std::pair<std::string, std::string>, size_t> jobsetRepeats;
|
||||||
|
|
||||||
bool uploadLogsToBinaryCache;
|
bool uploadLogsToBinaryCache;
|
||||||
|
|
||||||
|
@ -493,12 +466,6 @@ private:
|
||||||
public:
|
public:
|
||||||
State(std::optional<std::string> metricsAddrOpt);
|
State(std::optional<std::string> metricsAddrOpt);
|
||||||
|
|
||||||
struct BuildOptions {
|
|
||||||
unsigned int maxSilentTime, buildTimeout, repeats;
|
|
||||||
size_t maxLogSize;
|
|
||||||
bool enforceDeterminism;
|
|
||||||
};
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
nix::MaintainCount<counter> startDbUpdate();
|
nix::MaintainCount<counter> startDbUpdate();
|
||||||
|
@ -583,7 +550,7 @@ private:
|
||||||
|
|
||||||
void buildRemote(nix::ref<nix::Store> destStore,
|
void buildRemote(nix::ref<nix::Store> destStore,
|
||||||
Machine::ptr machine, Step::ptr step,
|
Machine::ptr machine, Step::ptr step,
|
||||||
const BuildOptions & buildOptions,
|
const nix::ServeProto::BuildOptions & buildOptions,
|
||||||
RemoteResult & result, std::shared_ptr<ActiveStep> activeStep,
|
RemoteResult & result, std::shared_ptr<ActiveStep> activeStep,
|
||||||
std::function<void(StepState)> updateStep,
|
std::function<void(StepState)> updateStep,
|
||||||
NarMemberDatas & narMembers);
|
NarMemberDatas & narMembers);
|
||||||
|
|
Loading…
Reference in a new issue