Merge branch 'nix-next' into nix-2.20

This commit is contained in:
John Ericson 2024-01-30 13:26:45 -05:00
commit 7b826ec5ad
10 changed files with 151 additions and 244 deletions

View file

@ -16,58 +16,57 @@
"type": "github" "type": "github"
} }
}, },
"lowdown-src": { "libgit2": {
"flake": false, "flake": false,
"locked": { "locked": {
"lastModified": 1633514407, "lastModified": 1697646580,
"narHash": "sha256-Dw32tiMjdK9t3ETl5fzGrutQTzh2rufgZV4A/BbxuD4=", "narHash": "sha256-oX4Z3S9WtJlwvj0uH9HlYcWv+x1hqp8mhXl7HsLu2f0=",
"owner": "kristapsdz", "owner": "libgit2",
"repo": "lowdown", "repo": "libgit2",
"rev": "d2c2b44ff6c27b936ec27358a2653caaef8f73b8", "rev": "45fd9ed7ae1a9b74b957ef4f337bc3c8b3df01b5",
"type": "github" "type": "github"
}, },
"original": { "original": {
"owner": "kristapsdz", "owner": "libgit2",
"repo": "lowdown", "repo": "libgit2",
"type": "github" "type": "github"
} }
}, },
"nix": { "nix": {
"inputs": { "inputs": {
"flake-compat": "flake-compat", "flake-compat": "flake-compat",
"lowdown-src": "lowdown-src", "libgit2": "libgit2",
"nixpkgs": [ "nixpkgs": [
"nixpkgs" "nixpkgs"
], ],
"nixpkgs-regression": "nixpkgs-regression" "nixpkgs-regression": "nixpkgs-regression"
}, },
"locked": { "locked": {
"lastModified": 1706208340, "lastModified": 1706195509,
"narHash": "sha256-wNyHUEIiKKVs6UXrUzhP7RSJQv0A8jckgcuylzftl8k=", "narHash": "sha256-1kwfk7H/MWZAcTKHnnWXo/+KlQeOTIRtOIzc4FX3QnE=",
"owner": "NixOS", "owner": "NixOS",
"repo": "nix", "repo": "nix",
"rev": "2c4bb93ba5a97e7078896ebc36385ce172960e4e", "rev": "8df68a213fc52a57b02a57005b0e06cc8de40ce3",
"type": "github" "type": "github"
}, },
"original": { "original": {
"owner": "NixOS", "owner": "NixOS",
"ref": "2.19-maintenance",
"repo": "nix", "repo": "nix",
"type": "github" "type": "github"
} }
}, },
"nixpkgs": { "nixpkgs": {
"locked": { "locked": {
"lastModified": 1701615100, "lastModified": 1705033721,
"narHash": "sha256-7VI84NGBvlCTduw2aHLVB62NvCiZUlALLqBe5v684Aw=", "narHash": "sha256-K5eJHmL1/kev6WuqyqqbS1cdNnSidIZ3jeqJ7GbrYnQ=",
"owner": "NixOS", "owner": "NixOS",
"repo": "nixpkgs", "repo": "nixpkgs",
"rev": "e9f06adb793d1cca5384907b3b8a4071d5d7cb19", "rev": "a1982c92d8980a0114372973cbdfe0a307f1bdea",
"type": "github" "type": "github"
}, },
"original": { "original": {
"owner": "NixOS", "owner": "NixOS",
"ref": "nixos-23.05", "ref": "nixos-23.05-small",
"repo": "nixpkgs", "repo": "nixpkgs",
"type": "github" "type": "github"
} }

View file

@ -1,8 +1,8 @@
{ {
description = "A Nix-based continuous build system"; description = "A Nix-based continuous build system";
inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixos-23.05"; inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixos-23.05-small";
inputs.nix.url = "github:NixOS/nix/2.19-maintenance"; inputs.nix.url = "github:NixOS/nix";
inputs.nix.inputs.nixpkgs.follows = "nixpkgs"; inputs.nix.inputs.nixpkgs.follows = "nixpkgs";
# TODO get rid of this once https://github.com/NixOS/nix/pull/9546 is # TODO get rid of this once https://github.com/NixOS/nix/pull/9546 is

View file

@ -89,7 +89,7 @@ struct MyArgs : MixEvalArgs, MixCommonArgs, RootArgs
static MyArgs myArgs; static MyArgs myArgs;
static std::string queryMetaStrings(EvalState & state, DrvInfo & drv, const std::string & name, const std::string & subAttribute) static std::string queryMetaStrings(EvalState & state, PackageInfo & drv, const std::string & name, const std::string & subAttribute)
{ {
Strings res; Strings res;
std::function<void(Value & v)> rec; std::function<void(Value & v)> rec;
@ -181,7 +181,7 @@ static void worker(
// CA derivations do not have static output paths, so we // CA derivations do not have static output paths, so we
// have to defensively not query output paths in case we // have to defensively not query output paths in case we
// encounter one. // encounter one.
DrvInfo::Outputs outputs = drv->queryOutputs( PackageInfo::Outputs outputs = drv->queryOutputs(
!experimentalFeatureSettings.isEnabled(Xp::CaDerivations)); !experimentalFeatureSettings.isEnabled(Xp::CaDerivations));
if (drv->querySystem() == "unknown") if (drv->querySystem() == "unknown")

View file

@ -8,6 +8,7 @@
#include "build-result.hh" #include "build-result.hh"
#include "path.hh" #include "path.hh"
#include "serve-protocol.hh" #include "serve-protocol.hh"
#include "serve-protocol-impl.hh"
#include "state.hh" #include "state.hh"
#include "current-process.hh" #include "current-process.hh"
#include "processes.hh" #include "processes.hh"
@ -20,12 +21,6 @@
using namespace nix; using namespace nix;
static void append(Strings & dst, const Strings & src)
{
dst.insert(dst.end(), src.begin(), src.end());
}
namespace nix::build_remote { namespace nix::build_remote {
static Strings extraStoreArgs(std::string & machine) static Strings extraStoreArgs(std::string & machine)
@ -48,58 +43,20 @@ static Strings extraStoreArgs(std::string & machine)
return result; return result;
} }
static void openConnection(::Machine::ptr machine, Path tmpDir, int stderrFD, SSHMaster::Connection & child) static std::unique_ptr<SSHMaster::Connection> openConnection(
::Machine::ptr machine, SSHMaster & master)
{ {
std::string pgmName; Strings command = {"nix-store", "--serve", "--write"};
Pipe to, from;
to.create();
from.create();
Strings argv;
if (machine->isLocalhost()) { if (machine->isLocalhost()) {
pgmName = "nix-store"; command.push_back("--builders");
argv = {"nix-store", "--builders", "", "--serve", "--write"}; command.push_back("");
} else { } else {
pgmName = "ssh"; command.splice(command.end(), extraStoreArgs(machine->sshName));
auto sshName = machine->sshName;
Strings extraArgs = extraStoreArgs(sshName);
argv = {"ssh", sshName};
if (machine->sshKey != "") append(argv, {"-i", machine->sshKey});
if (machine->sshPublicHostKey != "") {
Path fileName = tmpDir + "/host-key";
auto p = machine->sshName.find("@");
std::string host = p != std::string::npos ? std::string(machine->sshName, p + 1) : machine->sshName;
writeFile(fileName, host + " " + machine->sshPublicHostKey + "\n");
append(argv, {"-oUserKnownHostsFile=" + fileName});
}
append(argv,
{ "-x", "-a", "-oBatchMode=yes", "-oConnectTimeout=60", "-oTCPKeepAlive=yes"
, "--", "nix-store", "--serve", "--write" });
append(argv, extraArgs);
} }
child.sshPid = startProcess([&]() { return master.startCommand(std::move(command), {
restoreProcessContext(); "-a", "-oBatchMode=yes", "-oConnectTimeout=60", "-oTCPKeepAlive=yes"
if (dup2(to.readSide.get(), STDIN_FILENO) == -1)
throw SysError("cannot dup input pipe to stdin");
if (dup2(from.writeSide.get(), STDOUT_FILENO) == -1)
throw SysError("cannot dup output pipe to stdout");
if (dup2(stderrFD, STDERR_FILENO) == -1)
throw SysError("cannot dup stderr");
execvp(argv.front().c_str(), (char * *) stringsToCharPtrs(argv).data()); // FIXME: remove cast
throw SysError("cannot start %s", pgmName);
}); });
to.readSide = -1;
from.writeSide = -1;
child.in = to.writeSide.release();
child.out = from.readSide.release();
} }
@ -117,13 +74,10 @@ static void copyClosureTo(
garbage-collect paths that are already there. Optionally, ask garbage-collect paths that are already there. Optionally, ask
the remote host to substitute missing paths. */ the remote host to substitute missing paths. */
// FIXME: substitute output pollutes our build log // FIXME: substitute output pollutes our build log
conn.to << ServeProto::Command::QueryValidPaths << 1 << useSubstitutes;
ServeProto::write(destStore, conn, closure);
conn.to.flush();
/* Get back the set of paths that are already valid on the remote /* Get back the set of paths that are already valid on the remote
host. */ host. */
auto present = ServeProto::Serialise<StorePathSet>::read(destStore, conn); auto present = conn.queryValidPaths(
destStore, true, closure, useSubstitutes);
if (present.size() == closure.size()) return; if (present.size() == closure.size()) return;
@ -148,7 +102,7 @@ static void copyClosureTo(
// FIXME: use Store::topoSortPaths(). // FIXME: use Store::topoSortPaths().
static StorePaths reverseTopoSortPaths(const std::map<StorePath, ValidPathInfo> & paths) static StorePaths reverseTopoSortPaths(const std::map<StorePath, UnkeyedValidPathInfo> & paths)
{ {
StorePaths sorted; StorePaths sorted;
StorePathSet visited; StorePathSet visited;
@ -189,28 +143,6 @@ static std::pair<Path, AutoCloseFD> openLogFile(const std::string & logDir, cons
return {std::move(logFile), std::move(logFD)}; return {std::move(logFile), std::move(logFD)};
} }
/**
* @param conn is not fully initialized; it is this functions job to set
* the `remoteVersion` field after the handshake is completed.
* Therefore, no `ServeProto::Serialize` functions can be used until
* that field is set.
*/
static void handshake(::Machine::Connection & conn, unsigned int repeats)
{
conn.to << SERVE_MAGIC_1 << 0x206;
conn.to.flush();
unsigned int magic = readInt(conn.from);
if (magic != SERVE_MAGIC_2)
throw Error("protocol mismatch with nix-store --serve on %1%", conn.machine->sshName);
conn.remoteVersion = readInt(conn.from);
// Now `conn` is initialized.
if (GET_PROTOCOL_MAJOR(conn.remoteVersion) != 0x200)
throw Error("unsupported nix-store --serve protocol version on %1%", conn.machine->sshName);
if (GET_PROTOCOL_MINOR(conn.remoteVersion) < 3 && repeats > 0)
throw Error("machine %1% does not support repeating a build; please upgrade it to Nix 1.12", conn.machine->sshName);
}
static BasicDerivation sendInputs( static BasicDerivation sendInputs(
State & state, State & state,
Step & step, Step & step,
@ -281,21 +213,11 @@ static BuildResult performBuild(
Store & localStore, Store & localStore,
StorePath drvPath, StorePath drvPath,
const BasicDerivation & drv, const BasicDerivation & drv,
const State::BuildOptions & options, const ServeProto::BuildOptions & options,
counter & nrStepsBuilding counter & nrStepsBuilding
) )
{ {
conn.to << ServeProto::Command::BuildDerivation << localStore.printStorePath(drvPath); conn.putBuildDerivationRequest(localStore, drvPath, drv, options);
writeDerivation(conn.to, localStore, drv);
conn.to << options.maxSilentTime << options.buildTimeout;
if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 2)
conn.to << options.maxLogSize;
if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 3) {
conn.to
<< options.repeats // == build-repeat
<< options.enforceDeterminism;
}
conn.to.flush();
BuildResult result; BuildResult result;
@ -345,7 +267,7 @@ static BuildResult performBuild(
return result; return result;
} }
static std::map<StorePath, ValidPathInfo> queryPathInfos( static std::map<StorePath, UnkeyedValidPathInfo> queryPathInfos(
::Machine::Connection & conn, ::Machine::Connection & conn,
Store & localStore, Store & localStore,
StorePathSet & outputs, StorePathSet & outputs,
@ -354,30 +276,18 @@ static std::map<StorePath, ValidPathInfo> queryPathInfos(
{ {
/* Get info about each output path. */ /* Get info about each output path. */
std::map<StorePath, ValidPathInfo> infos; std::map<StorePath, UnkeyedValidPathInfo> infos;
conn.to << ServeProto::Command::QueryPathInfos; conn.to << ServeProto::Command::QueryPathInfos;
ServeProto::write(localStore, conn, outputs); ServeProto::write(localStore, conn, outputs);
conn.to.flush(); conn.to.flush();
while (true) { while (true) {
auto storePathS = readString(conn.from); auto storePathS = readString(conn.from);
if (storePathS == "") break; if (storePathS == "") break;
auto deriver = readString(conn.from); // deriver
auto references = ServeProto::Serialise<StorePathSet>::read(localStore, conn); auto storePath = localStore.parseStorePath(storePathS);
readLongLong(conn.from); // download size auto info = ServeProto::Serialise<UnkeyedValidPathInfo>::read(localStore, conn);
auto narSize = readLongLong(conn.from);
auto narHash = Hash::parseAny(readString(conn.from), htSHA256);
auto ca = ContentAddress::parseOpt(readString(conn.from));
readStrings<StringSet>(conn.from); // sigs
ValidPathInfo info(localStore.parseStorePath(storePathS), narHash);
assert(outputs.count(info.path));
info.references = references;
info.narSize = narSize;
totalNarSize += info.narSize; totalNarSize += info.narSize;
info.narHash = narHash; infos.insert_or_assign(std::move(storePath), std::move(info));
info.ca = ca;
if (deriver != "")
info.deriver = localStore.parseStorePath(deriver);
infos.insert_or_assign(info.path, info);
} }
return infos; return infos;
@ -418,14 +328,16 @@ static void copyPathsFromRemote(
NarMemberDatas & narMembers, NarMemberDatas & narMembers,
Store & localStore, Store & localStore,
Store & destStore, Store & destStore,
const std::map<StorePath, ValidPathInfo> & infos const std::map<StorePath, UnkeyedValidPathInfo> & infos
) )
{ {
auto pathsSorted = reverseTopoSortPaths(infos); auto pathsSorted = reverseTopoSortPaths(infos);
for (auto & path : pathsSorted) { for (auto & path : pathsSorted) {
auto & info = infos.find(path)->second; auto & info = infos.find(path)->second;
copyPathFromRemote(conn, narMembers, localStore, destStore, info); copyPathFromRemote(
conn, narMembers, localStore, destStore,
ValidPathInfo { path, info });
} }
} }
@ -492,7 +404,7 @@ void RemoteResult::updateWithBuildResult(const nix::BuildResult & buildResult)
void State::buildRemote(ref<Store> destStore, void State::buildRemote(ref<Store> destStore,
::Machine::ptr machine, Step::ptr step, ::Machine::ptr machine, Step::ptr step,
const BuildOptions & buildOptions, const ServeProto::BuildOptions & buildOptions,
RemoteResult & result, std::shared_ptr<ActiveStep> activeStep, RemoteResult & result, std::shared_ptr<ActiveStep> activeStep,
std::function<void(StepState)> updateStep, std::function<void(StepState)> updateStep,
NarMemberDatas & narMembers) NarMemberDatas & narMembers)
@ -503,21 +415,26 @@ void State::buildRemote(ref<Store> destStore,
AutoDelete logFileDel(logFile, false); AutoDelete logFileDel(logFile, false);
result.logFile = logFile; result.logFile = logFile;
nix::Path tmpDir = createTempDir();
AutoDelete tmpDirDel(tmpDir, true);
try { try {
updateStep(ssConnecting); updateStep(ssConnecting);
SSHMaster master {
machine->sshName,
machine->sshKey,
machine->sshPublicHostKey,
false, // no SSH master yet
false, // no compression yet
logFD.get(),
};
// FIXME: rewrite to use Store. // FIXME: rewrite to use Store.
SSHMaster::Connection child; auto child = build_remote::openConnection(machine, master);
build_remote::openConnection(machine, tmpDir, logFD.get(), child);
{ {
auto activeStepState(activeStep->state_.lock()); auto activeStepState(activeStep->state_.lock());
if (activeStepState->cancelled) throw Error("step cancelled"); if (activeStepState->cancelled) throw Error("step cancelled");
activeStepState->pid = child.sshPid; activeStepState->pid = child->sshPid;
} }
Finally clearPid([&]() { Finally clearPid([&]() {
@ -533,9 +450,13 @@ void State::buildRemote(ref<Store> destStore,
}); });
::Machine::Connection conn { ::Machine::Connection conn {
.from = child.out.get(), {
.to = child.in.get(), .to = child->in.get(),
.machine = machine, .from = child->out.get(),
/* Handshake. */
.remoteVersion = 0xdadbeef, // FIXME avoid dummy initialize
},
/*.machine =*/ machine,
}; };
Finally updateStats([&]() { Finally updateStats([&]() {
@ -543,14 +464,26 @@ void State::buildRemote(ref<Store> destStore,
bytesSent += conn.to.written; bytesSent += conn.to.written;
}); });
constexpr ServeProto::Version our_version = 0x206;
try { try {
build_remote::handshake(conn, buildOptions.repeats); conn.remoteVersion = decltype(conn)::handshake(
conn.to,
conn.from,
our_version,
machine->sshName);
} catch (EndOfFile & e) { } catch (EndOfFile & e) {
child.sshPid.wait(); child->sshPid.wait();
std::string s = chomp(readFile(result.logFile)); std::string s = chomp(readFile(result.logFile));
throw Error("cannot connect to %1%: %2%", machine->sshName, s); throw Error("cannot connect to %1%: %2%", machine->sshName, s);
} }
// Do not attempt to speak a newer version of the protocol.
//
// Per https://github.com/NixOS/nix/issues/9584 should be handled as
// part of `handshake` in upstream nix.
conn.remoteVersion = std::min(conn.remoteVersion, our_version);
{ {
auto info(machine->state->connectInfo.lock()); auto info(machine->state->connectInfo.lock());
info->consecutiveFailures = 0; info->consecutiveFailures = 0;
@ -653,8 +586,8 @@ void State::buildRemote(ref<Store> destStore,
} }
/* Shut down the connection. */ /* Shut down the connection. */
child.in = -1; child->in = -1;
child.sshPid.wait(); child->sshPid.wait();
} catch (Error & e) { } catch (Error & e) {
/* Disable this machine until a certain period of time has /* Disable this machine until a certain period of time has

View file

@ -98,10 +98,13 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
it). */ it). */
BuildID buildId; BuildID buildId;
std::optional<StorePath> buildDrvPath; std::optional<StorePath> buildDrvPath;
BuildOptions buildOptions; // Other fields set below
buildOptions.repeats = step->isDeterministic ? 1 : 0; nix::ServeProto::BuildOptions buildOptions {
buildOptions.maxLogSize = maxLogSize; .maxLogSize = maxLogSize,
buildOptions.enforceDeterminism = step->isDeterministic; .nrRepeats = step->isDeterministic ? 1u : 0u,
.enforceDeterminism = step->isDeterministic,
.keepFailed = false,
};
auto conn(dbPool.get()); auto conn(dbPool.get());
@ -136,7 +139,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
{ {
auto i = jobsetRepeats.find(std::make_pair(build2->projectName, build2->jobsetName)); auto i = jobsetRepeats.find(std::make_pair(build2->projectName, build2->jobsetName));
if (i != jobsetRepeats.end()) if (i != jobsetRepeats.end())
buildOptions.repeats = std::max(buildOptions.repeats, i->second); buildOptions.nrRepeats = std::max(buildOptions.nrRepeats, i->second);
} }
} }
if (!build) build = *dependents.begin(); if (!build) build = *dependents.begin();
@ -147,7 +150,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
buildOptions.buildTimeout = build->buildTimeout; buildOptions.buildTimeout = build->buildTimeout;
printInfo("performing step %s %d times on %s (needed by build %d and %d others)", printInfo("performing step %s %d times on %s (needed by build %d and %d others)",
localStore->printStorePath(step->drvPath), buildOptions.repeats + 1, machine->sshName, buildId, (dependents.size() - 1)); localStore->printStorePath(step->drvPath), buildOptions.nrRepeats + 1, machine->sshName, buildId, (dependents.size() - 1));
} }
if (!buildOneDone) if (!buildOneDone)

View file

@ -231,11 +231,11 @@ system_time State::doDispatch()
sort(machinesSorted.begin(), machinesSorted.end(), sort(machinesSorted.begin(), machinesSorted.end(),
[](const MachineInfo & a, const MachineInfo & b) -> bool [](const MachineInfo & a, const MachineInfo & b) -> bool
{ {
float ta = std::round(a.currentJobs / a.machine->speedFactorFloat); float ta = std::round(a.currentJobs / a.machine->speedFactor);
float tb = std::round(b.currentJobs / b.machine->speedFactorFloat); float tb = std::round(b.currentJobs / b.machine->speedFactor);
return return
ta != tb ? ta < tb : ta != tb ? ta < tb :
a.machine->speedFactorFloat != b.machine->speedFactorFloat ? a.machine->speedFactorFloat > b.machine->speedFactorFloat : a.machine->speedFactor != b.machine->speedFactor ? a.machine->speedFactor > b.machine->speedFactor :
a.currentJobs > b.currentJobs; a.currentJobs > b.currentJobs;
}); });

View file

@ -155,16 +155,16 @@ void State::parseMachines(const std::string & contents)
auto machine = std::make_shared<::Machine>(nix::Machine { auto machine = std::make_shared<::Machine>(nix::Machine {
// `storeUri`, not yet used // `storeUri`, not yet used
"", "",
// `systemTypes`, not yet used // `systemTypes`
{}, tokenizeString<StringSet>(tokens[1], ","),
// `sshKey` // `sshKey`
tokens[2] == "-" ? "" : tokens[2], tokens[2] == "-" ? "" : tokens[2],
// `maxJobs` // `maxJobs`
tokens[3] != "" tokens[3] != ""
? string2Int<MaxJobs>(tokens[3]).value() ? string2Int<MaxJobs>(tokens[3]).value()
: 1, : 1,
// `speedFactor`, not yet used // `speedFactor`
1, atof(tokens[4].c_str()),
// `supportedFeatures` // `supportedFeatures`
std::move(supportedFeatures), std::move(supportedFeatures),
// `mandatoryFeatures` // `mandatoryFeatures`
@ -176,8 +176,6 @@ void State::parseMachines(const std::string & contents)
}); });
machine->sshName = tokens[0]; machine->sshName = tokens[0];
machine->systemTypesSet = tokenizeString<StringSet>(tokens[1], ",");
machine->speedFactorFloat = atof(tokens[4].c_str());
/* Re-use the State object of the previous machine with the /* Re-use the State object of the previous machine with the
same name. */ same name. */
@ -641,7 +639,7 @@ void State::dumpStatus(Connection & conn)
json machine = { json machine = {
{"enabled", m->enabled}, {"enabled", m->enabled},
{"systemTypes", m->systemTypesSet}, {"systemTypes", m->systemTypes},
{"supportedFeatures", m->supportedFeatures}, {"supportedFeatures", m->supportedFeatures},
{"mandatoryFeatures", m->mandatoryFeatures}, {"mandatoryFeatures", m->mandatoryFeatures},
{"nrStepsDone", s->nrStepsDone.load()}, {"nrStepsDone", s->nrStepsDone.load()},

View file

@ -6,7 +6,46 @@
using namespace nix; using namespace nix;
struct Extractor : ParseSink
struct NarMemberConstructor : CreateRegularFileSink
{
NarMemberData & curMember;
HashSink hashSink = HashSink { HashAlgorithm::SHA256 };
std::optional<uint64_t> expectedSize;
NarMemberConstructor(NarMemberData & curMember)
: curMember(curMember)
{ }
void isExecutable() override
{
}
void preallocateContents(uint64_t size) override
{
expectedSize = size;
}
void operator () (std::string_view data) override
{
assert(expectedSize);
*curMember.fileSize += data.size();
hashSink(data);
if (curMember.contents) {
curMember.contents->append(data);
}
assert(curMember.fileSize <= expectedSize);
if (curMember.fileSize == expectedSize) {
auto [hash, len] = hashSink.finish();
assert(curMember.fileSize == len);
curMember.sha256 = hash;
}
}
};
struct Extractor : FileSystemObjectSink
{ {
std::unordered_set<Path> filesToKeep { std::unordered_set<Path> filesToKeep {
"/nix-support/hydra-build-products", "/nix-support/hydra-build-products",
@ -15,7 +54,6 @@ struct Extractor : ParseSink
}; };
NarMemberDatas & members; NarMemberDatas & members;
NarMemberData * curMember = nullptr;
Path prefix; Path prefix;
Extractor(NarMemberDatas & members, const Path & prefix) Extractor(NarMemberDatas & members, const Path & prefix)
@ -27,53 +65,22 @@ struct Extractor : ParseSink
members.insert_or_assign(prefix + path, NarMemberData { .type = SourceAccessor::Type::tDirectory }); members.insert_or_assign(prefix + path, NarMemberData { .type = SourceAccessor::Type::tDirectory });
} }
void createRegularFile(const Path & path) override void createRegularFile(const Path & path, std::function<void(CreateRegularFileSink &)> func) override
{ {
curMember = &members.insert_or_assign(prefix + path, NarMemberData { NarMemberConstructor nmc {
members.insert_or_assign(prefix + path, NarMemberData {
.type = SourceAccessor::Type::tRegular, .type = SourceAccessor::Type::tRegular,
.fileSize = 0, .fileSize = 0,
.contents = filesToKeep.count(path) ? std::optional("") : std::nullopt, .contents = filesToKeep.count(path) ? std::optional("") : std::nullopt,
}).first->second; }).first->second,
} };
func(nmc);
std::optional<uint64_t> expectedSize;
std::unique_ptr<HashSink> hashSink;
void preallocateContents(uint64_t size) override
{
expectedSize = size;
hashSink = std::make_unique<HashSink>(htSHA256);
}
void receiveContents(std::string_view data) override
{
assert(expectedSize);
assert(curMember);
assert(hashSink);
*curMember->fileSize += data.size();
(*hashSink)(data);
if (curMember->contents) {
curMember->contents->append(data);
}
assert(curMember->fileSize <= expectedSize);
if (curMember->fileSize == expectedSize) {
auto [hash, len] = hashSink->finish();
assert(curMember->fileSize == len);
curMember->sha256 = hash;
hashSink.reset();
}
} }
void createSymlink(const Path & path, const std::string & target) override void createSymlink(const Path & path, const std::string & target) override
{ {
members.insert_or_assign(prefix + path, NarMemberData { .type = SourceAccessor::Type::tSymlink }); members.insert_or_assign(prefix + path, NarMemberData { .type = SourceAccessor::Type::tSymlink });
} }
void isExecutable() override
{ }
void closeRegularFile() override
{ }
}; };

View file

@ -696,7 +696,7 @@ BuildOutput State::getBuildOutputCached(Connection & conn, nix::ref<nix::Store>
product.fileSize = row[2].as<off_t>(); product.fileSize = row[2].as<off_t>();
} }
if (!row[3].is_null()) if (!row[3].is_null())
product.sha256hash = Hash::parseAny(row[3].as<std::string>(), htSHA256); product.sha256hash = Hash::parseAny(row[3].as<std::string>(), HashAlgorithm::SHA256);
if (!row[4].is_null()) if (!row[4].is_null())
product.path = row[4].as<std::string>(); product.path = row[4].as<std::string>();
product.name = row[5].as<std::string>(); product.name = row[5].as<std::string>();

View file

@ -22,6 +22,7 @@
#include "sync.hh" #include "sync.hh"
#include "nar-extractor.hh" #include "nar-extractor.hh"
#include "serve-protocol.hh" #include "serve-protocol.hh"
#include "serve-protocol-impl.hh"
#include "machines.hh" #include "machines.hh"
@ -243,14 +244,6 @@ struct Machine : nix::Machine
we are not yet used to, but once we are, we don't need this. */ we are not yet used to, but once we are, we don't need this. */
std::string sshName; std::string sshName;
/* TODO Get rid once `nix::Machine::systemTypes` is a set not
vector. */
std::set<std::string> systemTypesSet;
/* TODO Get rid once `nix::Machine::systemTypes` is a `float` not
an `int`. */
float speedFactorFloat = 1.0;
struct State { struct State {
typedef std::shared_ptr<State> ptr; typedef std::shared_ptr<State> ptr;
counter currentJobs{0}; counter currentJobs{0};
@ -277,7 +270,7 @@ struct Machine : nix::Machine
{ {
/* Check that this machine is of the type required by the /* Check that this machine is of the type required by the
step. */ step. */
if (!systemTypesSet.count(step->drv->platform == "builtin" ? nix::settings.thisSystem : step->drv->platform)) if (!systemTypes.count(step->drv->platform == "builtin" ? nix::settings.thisSystem : step->drv->platform))
return false; return false;
/* Check that the step requires all mandatory features of this /* Check that the step requires all mandatory features of this
@ -307,29 +300,9 @@ struct Machine : nix::Machine
} }
// A connection to a machine // A connection to a machine
struct Connection { struct Connection : nix::ServeProto::BasicClientConnection {
nix::FdSource from;
nix::FdSink to;
nix::ServeProto::Version remoteVersion;
// Backpointer to the machine // Backpointer to the machine
ptr machine; ptr machine;
operator nix::ServeProto::ReadConn ()
{
return {
.from = from,
.version = remoteVersion,
};
}
operator nix::ServeProto::WriteConn ()
{
return {
.to = to,
.version = remoteVersion,
};
}
}; };
}; };
@ -464,7 +437,7 @@ private:
/* How often the build steps of a jobset should be repeated in /* How often the build steps of a jobset should be repeated in
order to detect non-determinism. */ order to detect non-determinism. */
std::map<std::pair<std::string, std::string>, unsigned int> jobsetRepeats; std::map<std::pair<std::string, std::string>, size_t> jobsetRepeats;
bool uploadLogsToBinaryCache; bool uploadLogsToBinaryCache;
@ -493,12 +466,6 @@ private:
public: public:
State(std::optional<std::string> metricsAddrOpt); State(std::optional<std::string> metricsAddrOpt);
struct BuildOptions {
unsigned int maxSilentTime, buildTimeout, repeats;
size_t maxLogSize;
bool enforceDeterminism;
};
private: private:
nix::MaintainCount<counter> startDbUpdate(); nix::MaintainCount<counter> startDbUpdate();
@ -583,7 +550,7 @@ private:
void buildRemote(nix::ref<nix::Store> destStore, void buildRemote(nix::ref<nix::Store> destStore,
Machine::ptr machine, Step::ptr step, Machine::ptr machine, Step::ptr step,
const BuildOptions & buildOptions, const nix::ServeProto::BuildOptions & buildOptions,
RemoteResult & result, std::shared_ptr<ActiveStep> activeStep, RemoteResult & result, std::shared_ptr<ActiveStep> activeStep,
std::function<void(StepState)> updateStep, std::function<void(StepState)> updateStep,
NarMemberDatas & narMembers); NarMemberDatas & narMembers);