forked from lix-project/hydra
Merge pull request #795 from NixOS/no-nar-accessors
Stream NARs from the remote to the destination store
This commit is contained in:
commit
858eb41fab
|
@ -311,7 +311,6 @@ Content-Type: application/json
|
|||
"buildproducts": {
|
||||
"1": {
|
||||
"path": "/nix/store/lzrxkjc35mhp8w7r8h82g0ljyizfchma-vm-test-run-unnamed",
|
||||
"sha1hash": null,
|
||||
"defaultpath": "log.html",
|
||||
"type": "report",
|
||||
"sha256hash": null,
|
||||
|
|
|
@ -5,11 +5,11 @@
|
|||
"nixpkgs": "nixpkgs"
|
||||
},
|
||||
"locked": {
|
||||
"lastModified": 1594125537,
|
||||
"narHash": "sha256-M801IExREv1T9F+K6YcCFERBFZ3+6ShwzAR2K7xvExA=",
|
||||
"lastModified": 1595761776,
|
||||
"narHash": "sha256-Y678b0XLyYKqF98bpsmZzrnMz0W7hwGSEkZxDURpMFE=",
|
||||
"owner": "NixOS",
|
||||
"repo": "nix",
|
||||
"rev": "1ab9da915422405452118ebb17b88cdfc90b1e10",
|
||||
"rev": "d7c0f094cbcfe1ae4ccc3d54baec00b66ccb1ed0",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
|
|
|
@ -672,10 +672,6 @@ components:
|
|||
name:
|
||||
description: Name of the file
|
||||
type: string
|
||||
sha1hash:
|
||||
nullable: true
|
||||
description: sha1 hash of the file
|
||||
type: string
|
||||
path:
|
||||
description: the nix store path
|
||||
type: string
|
||||
|
@ -863,7 +859,6 @@ components:
|
|||
defaultpath: ''
|
||||
name: hello-2.10
|
||||
type: nix-build
|
||||
sha1hash: null
|
||||
sha256hash: null
|
||||
subtype: ''
|
||||
path: /nix/store/y26qxcq1gg2hrqpxdc58b2fghv2bhxjg-hello-2.10
|
||||
|
|
|
@ -2,6 +2,7 @@ bin_PROGRAMS = hydra-queue-runner
|
|||
|
||||
hydra_queue_runner_SOURCES = hydra-queue-runner.cc queue-monitor.cc dispatcher.cc \
|
||||
builder.cc build-result.cc build-remote.cc \
|
||||
build-result.hh counter.hh token-server.hh state.hh db.hh
|
||||
build-result.hh counter.hh state.hh db.hh \
|
||||
nar-extractor.cc nar-extractor.hh
|
||||
hydra_queue_runner_LDADD = $(NIX_LIBS) -lpqxx
|
||||
hydra_queue_runner_CXXFLAGS = $(NIX_CFLAGS) -Wall -I ../libhydra -Wno-deprecated-declarations
|
||||
|
|
|
@ -124,11 +124,42 @@ static void copyClosureTo(std::timed_mutex & sendMutex, ref<Store> destStore,
|
|||
}
|
||||
|
||||
|
||||
// FIXME: use Store::topoSortPaths().
|
||||
StorePaths reverseTopoSortPaths(const std::map<StorePath, ValidPathInfo> & paths)
|
||||
{
|
||||
StorePaths sorted;
|
||||
StorePathSet visited;
|
||||
|
||||
std::function<void(const StorePath & path)> dfsVisit;
|
||||
|
||||
dfsVisit = [&](const StorePath & path) {
|
||||
if (!visited.insert(path).second) return;
|
||||
|
||||
auto info = paths.find(path);
|
||||
auto references = info == paths.end() ? StorePathSet() : info->second.references;
|
||||
|
||||
for (auto & i : references)
|
||||
/* Don't traverse into paths that don't exist. That can
|
||||
happen due to substitutes for non-existent paths. */
|
||||
if (i != path && paths.count(i))
|
||||
dfsVisit(i);
|
||||
|
||||
sorted.push_back(path);
|
||||
};
|
||||
|
||||
for (auto & i : paths)
|
||||
dfsVisit(i.first);
|
||||
|
||||
return sorted;
|
||||
}
|
||||
|
||||
|
||||
void State::buildRemote(ref<Store> destStore,
|
||||
Machine::ptr machine, Step::ptr step,
|
||||
unsigned int maxSilentTime, unsigned int buildTimeout, unsigned int repeats,
|
||||
RemoteResult & result, std::shared_ptr<ActiveStep> activeStep,
|
||||
std::function<void(StepState)> updateStep)
|
||||
std::function<void(StepState)> updateStep,
|
||||
NarMemberDatas & narMembers)
|
||||
{
|
||||
assert(BuildResult::TimedOut == 8);
|
||||
|
||||
|
@ -148,6 +179,7 @@ void State::buildRemote(ref<Store> destStore,
|
|||
|
||||
updateStep(ssConnecting);
|
||||
|
||||
// FIXME: rewrite to use Store.
|
||||
Child child;
|
||||
openConnection(machine, tmpDir, logFD.get(), child);
|
||||
|
||||
|
@ -182,7 +214,7 @@ void State::buildRemote(ref<Store> destStore,
|
|||
unsigned int remoteVersion;
|
||||
|
||||
try {
|
||||
to << SERVE_MAGIC_1 << 0x203;
|
||||
to << SERVE_MAGIC_1 << 0x204;
|
||||
to.flush();
|
||||
|
||||
unsigned int magic = readInt(from);
|
||||
|
@ -394,8 +426,6 @@ void State::buildRemote(ref<Store> destStore,
|
|||
}
|
||||
|
||||
/* Copy the output paths. */
|
||||
result.accessor = destStore->getFSAccessor();
|
||||
|
||||
if (!machine->isLocalhost() || localStore != std::shared_ptr<Store>(destStore)) {
|
||||
updateStep(ssReceivingOutputs);
|
||||
|
||||
|
@ -405,17 +435,26 @@ void State::buildRemote(ref<Store> destStore,
|
|||
|
||||
auto outputs = step->drv->outputPaths();
|
||||
|
||||
/* Query the size of the output paths. */
|
||||
/* Get info about each output path. */
|
||||
std::map<StorePath, ValidPathInfo> infos;
|
||||
size_t totalNarSize = 0;
|
||||
to << cmdQueryPathInfos;
|
||||
writeStorePaths(*localStore, to, outputs);
|
||||
to.flush();
|
||||
while (true) {
|
||||
if (readString(from) == "") break;
|
||||
auto storePathS = readString(from);
|
||||
if (storePathS == "") break;
|
||||
ValidPathInfo info(localStore->parseStorePath(storePathS));
|
||||
assert(outputs.count(info.path));
|
||||
readString(from); // deriver
|
||||
readStrings<PathSet>(from); // references
|
||||
info.references = readStorePaths<StorePathSet>(*localStore, from);
|
||||
readLongLong(from); // download size
|
||||
totalNarSize += readLongLong(from);
|
||||
info.narSize = readLongLong(from);
|
||||
totalNarSize += info.narSize;
|
||||
info.narHash = Hash(readString(from), htSHA256);
|
||||
info.ca = parseContentAddressOpt(readString(from));
|
||||
readStrings<StringSet>(from); // sigs
|
||||
infos.insert_or_assign(info.path, info);
|
||||
}
|
||||
|
||||
if (totalNarSize > maxOutputSize) {
|
||||
|
@ -423,33 +462,28 @@ void State::buildRemote(ref<Store> destStore,
|
|||
return;
|
||||
}
|
||||
|
||||
/* Copy each path. */
|
||||
printMsg(lvlDebug, "copying outputs of ‘%s’ from ‘%s’ (%d bytes)",
|
||||
localStore->printStorePath(step->drvPath), machine->sshName, totalNarSize);
|
||||
|
||||
/* Block until we have the required amount of memory
|
||||
available, which is twice the NAR size (namely the
|
||||
uncompressed and worst-case compressed NAR), plus 150
|
||||
MB for xz compression overhead. (The xz manpage claims
|
||||
~94 MiB, but that's not was I'm seeing.) */
|
||||
auto resStart = std::chrono::steady_clock::now();
|
||||
size_t compressionCost = totalNarSize + 150 * 1024 * 1024;
|
||||
result.tokens = std::make_unique<nix::TokenServer::Token>(memoryTokens.get(totalNarSize + compressionCost));
|
||||
auto resStop = std::chrono::steady_clock::now();
|
||||
auto pathsSorted = reverseTopoSortPaths(infos);
|
||||
|
||||
auto resMs = std::chrono::duration_cast<std::chrono::milliseconds>(resStop - resStart).count();
|
||||
if (resMs >= 1000)
|
||||
printMsg(lvlError, "warning: had to wait %d ms for %d memory tokens for %s",
|
||||
resMs, totalNarSize, localStore->printStorePath(step->drvPath));
|
||||
|
||||
to << cmdExportPaths << 0;
|
||||
writeStorePaths(*localStore, to, outputs);
|
||||
for (auto & path : pathsSorted) {
|
||||
auto & info = infos.find(path)->second;
|
||||
to << cmdDumpStorePath << localStore->printStorePath(path);
|
||||
to.flush();
|
||||
destStore->importPaths(from, result.accessor, NoCheckSigs);
|
||||
|
||||
/* Release the tokens pertaining to NAR
|
||||
compression. After this we only have the uncompressed
|
||||
NAR in memory. */
|
||||
result.tokens->give_back(compressionCost);
|
||||
/* Receive the NAR from the remote and add it to the
|
||||
destination store. Meanwhile, extract all the info from the
|
||||
NAR that getBuildOutput() needs. */
|
||||
auto source2 = sinkToSource([&](Sink & sink)
|
||||
{
|
||||
TeeSource tee(from, sink);
|
||||
extractNarData(tee, localStore->printStorePath(path), narMembers);
|
||||
});
|
||||
|
||||
destStore->addToStore(info, *source2);
|
||||
}
|
||||
|
||||
auto now2 = std::chrono::steady_clock::now();
|
||||
|
||||
|
|
|
@ -8,8 +8,10 @@
|
|||
using namespace nix;
|
||||
|
||||
|
||||
BuildOutput getBuildOutput(nix::ref<Store> store,
|
||||
nix::ref<nix::FSAccessor> accessor, const Derivation & drv)
|
||||
BuildOutput getBuildOutput(
|
||||
nix::ref<Store> store,
|
||||
NarMemberDatas & narMembers,
|
||||
const Derivation & drv)
|
||||
{
|
||||
BuildOutput res;
|
||||
|
||||
|
@ -24,6 +26,20 @@ BuildOutput getBuildOutput(nix::ref<Store> store,
|
|||
if (outputs.count(path)) res.size += info->narSize;
|
||||
}
|
||||
|
||||
/* Fetch missing data. Usually buildRemote() will have extracted
|
||||
this data from the incoming NARs. */
|
||||
for (auto & output : outputs) {
|
||||
auto outputS = store->printStorePath(output);
|
||||
if (!narMembers.count(outputS)) {
|
||||
printInfo("fetching NAR contents of '%s'...", outputS);
|
||||
auto source = sinkToSource([&](Sink & sink)
|
||||
{
|
||||
store->narFromPath(output, sink);
|
||||
});
|
||||
extractNarData(*source, outputS, narMembers);
|
||||
}
|
||||
}
|
||||
|
||||
/* Get build products. */
|
||||
bool explicitProducts = false;
|
||||
|
||||
|
@ -39,17 +55,18 @@ BuildOutput getBuildOutput(nix::ref<Store> store,
|
|||
for (auto & output : outputs) {
|
||||
auto outputS = store->printStorePath(output);
|
||||
|
||||
Path failedFile = outputS + "/nix-support/failed";
|
||||
if (accessor->stat(failedFile).type == FSAccessor::Type::tRegular)
|
||||
if (narMembers.count(outputS + "/nix-support/failed"))
|
||||
res.failed = true;
|
||||
|
||||
Path productsFile = outputS + "/nix-support/hydra-build-products";
|
||||
if (accessor->stat(productsFile).type != FSAccessor::Type::tRegular)
|
||||
auto productsFile = narMembers.find(outputS + "/nix-support/hydra-build-products");
|
||||
if (productsFile == narMembers.end() ||
|
||||
productsFile->second.type != FSAccessor::Type::tRegular)
|
||||
continue;
|
||||
assert(productsFile->second.contents);
|
||||
|
||||
explicitProducts = true;
|
||||
|
||||
for (auto & line : tokenizeString<Strings>(accessor->readFile(productsFile), "\n")) {
|
||||
for (auto & line : tokenizeString<Strings>(productsFile->second.contents.value(), "\n")) {
|
||||
BuildProduct product;
|
||||
|
||||
std::smatch match;
|
||||
|
@ -69,17 +86,15 @@ BuildOutput getBuildOutput(nix::ref<Store> store,
|
|||
product.path = canonPath(product.path);
|
||||
if (!store->isInStore(product.path)) continue;
|
||||
|
||||
auto st = accessor->stat(product.path);
|
||||
if (st.type == FSAccessor::Type::tMissing) continue;
|
||||
auto file = narMembers.find(product.path);
|
||||
if (file == narMembers.end()) continue;
|
||||
|
||||
product.name = product.path == store->printStorePath(output) ? "" : baseNameOf(product.path);
|
||||
|
||||
if (st.type == FSAccessor::Type::tRegular) {
|
||||
if (file->second.type == FSAccessor::Type::tRegular) {
|
||||
product.isRegular = true;
|
||||
product.fileSize = st.fileSize;
|
||||
auto contents = accessor->readFile(product.path);
|
||||
product.sha1hash = hashString(htSHA1, contents);
|
||||
product.sha256hash = hashString(htSHA256, contents);
|
||||
product.fileSize = file->second.fileSize.value();
|
||||
product.sha256hash = file->second.sha256.value();
|
||||
}
|
||||
|
||||
res.products.push_back(product);
|
||||
|
@ -96,29 +111,30 @@ BuildOutput getBuildOutput(nix::ref<Store> store,
|
|||
product.subtype = output.first == "out" ? "" : output.first;
|
||||
product.name = output.second.path.name();
|
||||
|
||||
auto st = accessor->stat(product.path);
|
||||
if (st.type == FSAccessor::Type::tMissing)
|
||||
throw Error("getting status of ‘%s’", product.path);
|
||||
if (st.type == FSAccessor::Type::tDirectory)
|
||||
auto file = narMembers.find(product.path);
|
||||
assert(file != narMembers.end());
|
||||
if (file->second.type == FSAccessor::Type::tDirectory)
|
||||
res.products.push_back(product);
|
||||
}
|
||||
}
|
||||
|
||||
/* Get the release name from $output/nix-support/hydra-release-name. */
|
||||
for (auto & output : outputs) {
|
||||
auto p = store->printStorePath(output) + "/nix-support/hydra-release-name";
|
||||
if (accessor->stat(p).type != FSAccessor::Type::tRegular) continue;
|
||||
try {
|
||||
res.releaseName = trim(accessor->readFile(p));
|
||||
} catch (Error & e) { continue; }
|
||||
auto file = narMembers.find(store->printStorePath(output) + "/nix-support/hydra-release-name");
|
||||
if (file == narMembers.end() ||
|
||||
file->second.type != FSAccessor::Type::tRegular)
|
||||
continue;
|
||||
res.releaseName = trim(file->second.contents.value());
|
||||
// FIXME: validate release name
|
||||
}
|
||||
|
||||
/* Get metrics. */
|
||||
for (auto & output : outputs) {
|
||||
auto metricsFile = store->printStorePath(output) + "/nix-support/hydra-metrics";
|
||||
if (accessor->stat(metricsFile).type != FSAccessor::Type::tRegular) continue;
|
||||
for (auto & line : tokenizeString<Strings>(accessor->readFile(metricsFile), "\n")) {
|
||||
auto file = narMembers.find(store->printStorePath(output) + "/nix-support/hydra-metrics");
|
||||
if (file == narMembers.end() ||
|
||||
file->second.type != FSAccessor::Type::tRegular)
|
||||
continue;
|
||||
for (auto & line : tokenizeString<Strings>(file->second.contents.value(), "\n")) {
|
||||
auto fields = tokenizeString<std::vector<std::string>>(line);
|
||||
if (fields.size() < 2) continue;
|
||||
BuildMetric metric;
|
||||
|
|
|
@ -5,13 +5,14 @@
|
|||
#include "hash.hh"
|
||||
#include "derivations.hh"
|
||||
#include "store-api.hh"
|
||||
#include "nar-extractor.hh"
|
||||
|
||||
struct BuildProduct
|
||||
{
|
||||
nix::Path path, defaultPath;
|
||||
std::string type, subtype, name;
|
||||
bool isRegular = false;
|
||||
nix::Hash sha1hash, sha256hash;
|
||||
nix::Hash sha256hash;
|
||||
off_t fileSize = 0;
|
||||
BuildProduct() { }
|
||||
};
|
||||
|
@ -38,5 +39,7 @@ struct BuildOutput
|
|||
std::map<std::string, BuildMetric> metrics;
|
||||
};
|
||||
|
||||
BuildOutput getBuildOutput(nix::ref<nix::Store> store,
|
||||
nix::ref<nix::FSAccessor> accessor, const nix::Derivation & drv);
|
||||
BuildOutput getBuildOutput(
|
||||
nix::ref<nix::Store> store,
|
||||
NarMemberDatas & narMembers,
|
||||
const nix::Derivation & drv);
|
||||
|
|
|
@ -201,11 +201,11 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
|
|||
};
|
||||
|
||||
/* Do the build. */
|
||||
NarMemberDatas narMembers;
|
||||
|
||||
try {
|
||||
/* FIXME: referring builds may have conflicting timeouts. */
|
||||
buildRemote(destStore, machine, step, maxSilentTime, buildTimeout, repeats, result, activeStep, updateStep);
|
||||
} catch (NoTokens & e) {
|
||||
result.stepStatus = bsNarSizeLimitExceeded;
|
||||
buildRemote(destStore, machine, step, maxSilentTime, buildTimeout, repeats, result, activeStep, updateStep, narMembers);
|
||||
} catch (Error & e) {
|
||||
if (activeStep->state_.lock()->cancelled) {
|
||||
printInfo("marking step %d of build %d as cancelled", stepNr, buildId);
|
||||
|
@ -220,11 +220,8 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
|
|||
|
||||
if (result.stepStatus == bsSuccess) {
|
||||
updateStep(ssPostProcessing);
|
||||
res = getBuildOutput(destStore, ref<FSAccessor>(result.accessor), *step->drv);
|
||||
res = getBuildOutput(destStore, narMembers, *step->drv);
|
||||
}
|
||||
|
||||
result.accessor = 0;
|
||||
result.tokens = 0;
|
||||
}
|
||||
|
||||
time_t stepStopTime = time(0);
|
||||
|
|
|
@ -30,13 +30,6 @@ template<> void toJSON<double>(std::ostream & str, const double & n) { str << n;
|
|||
}
|
||||
|
||||
|
||||
static uint64_t getMemSize()
|
||||
{
|
||||
auto pages = sysconf(_SC_PHYS_PAGES);
|
||||
return pages >= 0 ? pages * sysconf(_SC_PAGESIZE) : 4ULL << 30;
|
||||
}
|
||||
|
||||
|
||||
std::string getEnvOrDie(const std::string & key)
|
||||
{
|
||||
auto value = getEnv(key);
|
||||
|
@ -49,14 +42,11 @@ State::State()
|
|||
: config(std::make_unique<HydraConfig>())
|
||||
, maxUnsupportedTime(config->getIntOption("max_unsupported_time", 0))
|
||||
, dbPool(config->getIntOption("max_db_connections", 128))
|
||||
, memoryTokens(config->getIntOption("nar_buffer_size", getMemSize() / 2))
|
||||
, maxOutputSize(config->getIntOption("max_output_size", 2ULL << 30))
|
||||
, maxLogSize(config->getIntOption("max_log_size", 64ULL << 20))
|
||||
, uploadLogsToBinaryCache(config->getBoolOption("upload_logs_to_binary_cache", false))
|
||||
, rootsDir(config->getStrOption("gc_roots_dir", fmt("%s/gcroots/per-user/%s/hydra-roots", settings.nixStateDir, getEnvOrDie("LOGNAME"))))
|
||||
{
|
||||
debug("using %d bytes for the NAR buffer", memoryTokens.capacity());
|
||||
|
||||
hydraData = getEnvOrDie("HYDRA_DATA");
|
||||
|
||||
logDir = canonPath(hydraData + "/build-logs");
|
||||
|
@ -417,13 +407,12 @@ void State::markSucceededBuild(pqxx::work & txn, Build::ptr build,
|
|||
unsigned int productNr = 1;
|
||||
for (auto & product : res.products) {
|
||||
txn.exec_params0
|
||||
("insert into BuildProducts (build, productnr, type, subtype, fileSize, sha1hash, sha256hash, path, name, defaultPath) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
|
||||
("insert into BuildProducts (build, productnr, type, subtype, fileSize, sha256hash, path, name, defaultPath) values ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
|
||||
build->id,
|
||||
productNr++,
|
||||
product.type,
|
||||
product.subtype,
|
||||
product.isRegular ? std::make_optional(product.fileSize) : std::nullopt,
|
||||
product.isRegular ? std::make_optional(product.sha1hash.to_string(Base16, false)) : std::nullopt,
|
||||
product.isRegular ? std::make_optional(product.sha256hash.to_string(Base16, false)) : std::nullopt,
|
||||
product.path,
|
||||
product.name,
|
||||
|
@ -544,7 +533,6 @@ void State::dumpStatus(Connection & conn)
|
|||
root.attr("dispatchTimeAvgMs", nrDispatcherWakeups == 0 ? 0.0 : (float) dispatchTimeMs / nrDispatcherWakeups);
|
||||
root.attr("nrDbConnections", dbPool.count());
|
||||
root.attr("nrActiveDbUpdates", nrActiveDbUpdates);
|
||||
root.attr("memoryTokensInUse", memoryTokens.currentUse());
|
||||
|
||||
{
|
||||
auto nested = root.object("machines");
|
||||
|
|
82
src/hydra-queue-runner/nar-extractor.cc
Normal file
82
src/hydra-queue-runner/nar-extractor.cc
Normal file
|
@ -0,0 +1,82 @@
|
|||
#include "nar-extractor.hh"
|
||||
|
||||
#include "archive.hh"
|
||||
|
||||
#include <unordered_set>
|
||||
|
||||
using namespace nix;
|
||||
|
||||
struct Extractor : ParseSink
|
||||
{
|
||||
std::unordered_set<Path> filesToKeep {
|
||||
"/nix-support/hydra-build-products",
|
||||
"/nix-support/hydra-release-name",
|
||||
"/nix-support/hydra-metrics",
|
||||
};
|
||||
|
||||
NarMemberDatas & members;
|
||||
NarMemberData * curMember = nullptr;
|
||||
Path prefix;
|
||||
|
||||
Extractor(NarMemberDatas & members, const Path & prefix)
|
||||
: members(members), prefix(prefix)
|
||||
{ }
|
||||
|
||||
void createDirectory(const Path & path) override
|
||||
{
|
||||
members.insert_or_assign(prefix + path, NarMemberData { .type = FSAccessor::Type::tDirectory });
|
||||
}
|
||||
|
||||
void createRegularFile(const Path & path) override
|
||||
{
|
||||
curMember = &members.insert_or_assign(prefix + path, NarMemberData {
|
||||
.type = FSAccessor::Type::tRegular,
|
||||
.fileSize = 0,
|
||||
.contents = filesToKeep.count(path) ? std::optional("") : std::nullopt,
|
||||
}).first->second;
|
||||
}
|
||||
|
||||
std::optional<unsigned long long> expectedSize;
|
||||
std::unique_ptr<HashSink> hashSink;
|
||||
|
||||
void preallocateContents(unsigned long long size) override
|
||||
{
|
||||
expectedSize = size;
|
||||
hashSink = std::make_unique<HashSink>(htSHA256);
|
||||
}
|
||||
|
||||
void receiveContents(unsigned char * data, unsigned int len) override
|
||||
{
|
||||
assert(expectedSize);
|
||||
assert(curMember);
|
||||
assert(hashSink);
|
||||
*curMember->fileSize += len;
|
||||
(*hashSink)(data, len);
|
||||
if (curMember->contents) {
|
||||
curMember->contents->append((char *) data, len);
|
||||
}
|
||||
assert(curMember->fileSize <= expectedSize);
|
||||
if (curMember->fileSize == expectedSize) {
|
||||
auto [hash, len] = hashSink->finish();
|
||||
assert(curMember->fileSize == len);
|
||||
curMember->sha256 = hash;
|
||||
hashSink.reset();
|
||||
}
|
||||
}
|
||||
|
||||
void createSymlink(const Path & path, const string & target) override
|
||||
{
|
||||
members.insert_or_assign(prefix + path, NarMemberData { .type = FSAccessor::Type::tSymlink });
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
void extractNarData(
|
||||
Source & source,
|
||||
const Path & prefix,
|
||||
NarMemberDatas & members)
|
||||
{
|
||||
Extractor extractor(members, prefix);
|
||||
parseDump(extractor, source);
|
||||
// Note: this point may not be reached if we're in a coroutine.
|
||||
}
|
23
src/hydra-queue-runner/nar-extractor.hh
Normal file
23
src/hydra-queue-runner/nar-extractor.hh
Normal file
|
@ -0,0 +1,23 @@
|
|||
#pragma once
|
||||
|
||||
#include "fs-accessor.hh"
|
||||
#include "types.hh"
|
||||
#include "serialise.hh"
|
||||
#include "hash.hh"
|
||||
|
||||
struct NarMemberData
|
||||
{
|
||||
nix::FSAccessor::Type type;
|
||||
std::optional<unsigned long long> fileSize;
|
||||
std::optional<std::string> contents;
|
||||
std::optional<nix::Hash> sha256;
|
||||
};
|
||||
|
||||
typedef std::map<nix::Path, NarMemberData> NarMemberDatas;
|
||||
|
||||
/* Read a NAR from a source and get to some info about every file
|
||||
inside the NAR. */
|
||||
void extractNarData(
|
||||
nix::Source & source,
|
||||
const nix::Path & prefix,
|
||||
NarMemberDatas & members);
|
|
@ -632,7 +632,7 @@ BuildOutput State::getBuildOutputCached(Connection & conn, nix::ref<nix::Store>
|
|||
res.size = r[0][4].is_null() ? 0 : r[0][4].as<unsigned long long>();
|
||||
|
||||
auto products = txn.exec_params
|
||||
("select type, subtype, fileSize, sha1hash, sha256hash, path, name, defaultPath from BuildProducts where build = $1 order by productnr",
|
||||
("select type, subtype, fileSize, sha256hash, path, name, defaultPath from BuildProducts where build = $1 order by productnr",
|
||||
id);
|
||||
|
||||
for (auto row : products) {
|
||||
|
@ -646,14 +646,12 @@ BuildOutput State::getBuildOutputCached(Connection & conn, nix::ref<nix::Store>
|
|||
product.fileSize = row[2].as<off_t>();
|
||||
}
|
||||
if (!row[3].is_null())
|
||||
product.sha1hash = Hash(row[3].as<std::string>(), htSHA1);
|
||||
product.sha256hash = Hash(row[3].as<std::string>(), htSHA256);
|
||||
if (!row[4].is_null())
|
||||
product.sha256hash = Hash(row[4].as<std::string>(), htSHA256);
|
||||
if (!row[5].is_null())
|
||||
product.path = row[5].as<std::string>();
|
||||
product.name = row[6].as<std::string>();
|
||||
if (!row[7].is_null())
|
||||
product.defaultPath = row[7].as<std::string>();
|
||||
product.path = row[4].as<std::string>();
|
||||
product.name = row[5].as<std::string>();
|
||||
if (!row[6].is_null())
|
||||
product.defaultPath = row[6].as<std::string>();
|
||||
res.products.emplace_back(product);
|
||||
}
|
||||
|
||||
|
@ -674,5 +672,6 @@ BuildOutput State::getBuildOutputCached(Connection & conn, nix::ref<nix::Store>
|
|||
|
||||
}
|
||||
|
||||
return getBuildOutput(destStore, destStore->getFSAccessor(), drv);
|
||||
NarMemberDatas narMembers;
|
||||
return getBuildOutput(destStore, narMembers, drv);
|
||||
}
|
||||
|
|
|
@ -8,13 +8,13 @@
|
|||
#include <queue>
|
||||
|
||||
#include "db.hh"
|
||||
#include "token-server.hh"
|
||||
|
||||
#include "parsed-derivations.hh"
|
||||
#include "pathlocks.hh"
|
||||
#include "pool.hh"
|
||||
#include "store-api.hh"
|
||||
#include "sync.hh"
|
||||
#include "nar-extractor.hh"
|
||||
|
||||
|
||||
typedef unsigned int BuildID;
|
||||
|
@ -65,8 +65,6 @@ struct RemoteResult
|
|||
time_t startTime = 0, stopTime = 0;
|
||||
unsigned int overhead = 0;
|
||||
nix::Path logFile;
|
||||
std::unique_ptr<nix::TokenServer::Token> tokens;
|
||||
std::shared_ptr<nix::FSAccessor> accessor;
|
||||
|
||||
BuildStatus buildStatus() const
|
||||
{
|
||||
|
@ -410,13 +408,6 @@ private:
|
|||
std::shared_ptr<nix::Store> localStore;
|
||||
std::shared_ptr<nix::Store> _destStore;
|
||||
|
||||
/* Token server to prevent threads from allocating too many big
|
||||
strings concurrently while importing NARs from the build
|
||||
machines. When a thread imports a NAR of size N, it will first
|
||||
acquire N memory tokens, causing it to block until that many
|
||||
tokens are available. */
|
||||
nix::TokenServer memoryTokens;
|
||||
|
||||
size_t maxOutputSize;
|
||||
size_t maxLogSize;
|
||||
|
||||
|
@ -527,7 +518,8 @@ private:
|
|||
unsigned int maxSilentTime, unsigned int buildTimeout,
|
||||
unsigned int repeats,
|
||||
RemoteResult & result, std::shared_ptr<ActiveStep> activeStep,
|
||||
std::function<void(StepState)> updateStep);
|
||||
std::function<void(StepState)> updateStep,
|
||||
NarMemberDatas & narMembers);
|
||||
|
||||
void markSucceededBuild(pqxx::work & txn, Build::ptr build,
|
||||
const BuildOutput & res, bool isCachedBuild, time_t startTime, time_t stopTime);
|
||||
|
|
|
@ -1,109 +0,0 @@
|
|||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
|
||||
#include "sync.hh"
|
||||
#include "types.hh"
|
||||
|
||||
namespace nix {
|
||||
|
||||
MakeError(NoTokens, Error);
|
||||
|
||||
/* This class hands out tokens. There are only ‘maxTokens’ tokens
|
||||
available. Calling get(N) will return a Token object, representing
|
||||
ownership of N tokens. If the requested number of tokens is
|
||||
unavailable, get() will sleep until another thread returns a
|
||||
token. */
|
||||
|
||||
class TokenServer
|
||||
{
|
||||
const size_t maxTokens;
|
||||
|
||||
Sync<size_t> inUse{0};
|
||||
std::condition_variable wakeup;
|
||||
|
||||
public:
|
||||
TokenServer(size_t maxTokens) : maxTokens(maxTokens) { }
|
||||
|
||||
class Token
|
||||
{
|
||||
friend TokenServer;
|
||||
|
||||
TokenServer * ts;
|
||||
|
||||
size_t tokens;
|
||||
|
||||
bool acquired = false;
|
||||
|
||||
Token(TokenServer * ts, size_t tokens, unsigned int timeout)
|
||||
: ts(ts), tokens(tokens)
|
||||
{
|
||||
if (tokens >= ts->maxTokens)
|
||||
throw NoTokens("requesting more tokens (%d) than exist (%d)", tokens, ts->maxTokens);
|
||||
debug("acquiring %d tokens", tokens);
|
||||
auto inUse(ts->inUse.lock());
|
||||
while (*inUse + tokens > ts->maxTokens)
|
||||
if (timeout) {
|
||||
if (!inUse.wait_for(ts->wakeup, std::chrono::seconds(timeout),
|
||||
[&]() { return *inUse + tokens <= ts->maxTokens; }))
|
||||
return;
|
||||
} else
|
||||
inUse.wait(ts->wakeup);
|
||||
*inUse += tokens;
|
||||
acquired = true;
|
||||
}
|
||||
|
||||
public:
|
||||
|
||||
Token(Token && t) : ts(t.ts), tokens(t.tokens), acquired(t.acquired)
|
||||
{
|
||||
t.ts = 0;
|
||||
t.acquired = false;
|
||||
}
|
||||
Token(const Token & l) = delete;
|
||||
|
||||
~Token()
|
||||
{
|
||||
if (!ts || !acquired) return;
|
||||
give_back(tokens);
|
||||
}
|
||||
|
||||
bool operator ()() { return acquired; }
|
||||
|
||||
void give_back(size_t t)
|
||||
{
|
||||
debug("returning %d tokens", t);
|
||||
if (!t) return;
|
||||
assert(acquired);
|
||||
assert(t <= tokens);
|
||||
{
|
||||
auto inUse(ts->inUse.lock());
|
||||
assert(*inUse >= t);
|
||||
*inUse -= t;
|
||||
tokens -= t;
|
||||
}
|
||||
// FIXME: inefficient. Should wake up waiters that can
|
||||
// proceed now.
|
||||
ts->wakeup.notify_all();
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
Token get(size_t tokens = 1, unsigned int timeout = 0)
|
||||
{
|
||||
return Token(this, tokens, timeout);
|
||||
}
|
||||
|
||||
size_t currentUse()
|
||||
{
|
||||
auto inUse_(inUse.lock());
|
||||
return *inUse_;
|
||||
}
|
||||
|
||||
size_t capacity()
|
||||
{
|
||||
return maxTokens;
|
||||
}
|
||||
};
|
||||
|
||||
}
|
|
@ -90,7 +90,6 @@ sub buildFinished {
|
|||
type => $product->type,
|
||||
subtype => $product->subtype,
|
||||
fileSize => $product->filesize,
|
||||
sha1hash => $product->sha1hash,
|
||||
sha256hash => $product->sha256hash,
|
||||
path => $product->path,
|
||||
name => $product->name,
|
||||
|
|
|
@ -61,11 +61,6 @@ __PACKAGE__->table("buildproducts");
|
|||
data_type: 'bigint'
|
||||
is_nullable: 1
|
||||
|
||||
=head2 sha1hash
|
||||
|
||||
data_type: 'text'
|
||||
is_nullable: 1
|
||||
|
||||
=head2 sha256hash
|
||||
|
||||
data_type: 'text'
|
||||
|
@ -99,8 +94,6 @@ __PACKAGE__->add_columns(
|
|||
{ data_type => "text", is_nullable => 0 },
|
||||
"filesize",
|
||||
{ data_type => "bigint", is_nullable => 1 },
|
||||
"sha1hash",
|
||||
{ data_type => "text", is_nullable => 1 },
|
||||
"sha256hash",
|
||||
{ data_type => "text", is_nullable => 1 },
|
||||
"path",
|
||||
|
@ -143,8 +136,8 @@ __PACKAGE__->belongs_to(
|
|||
);
|
||||
|
||||
|
||||
# Created by DBIx::Class::Schema::Loader v0.07049 @ 2020-02-06 12:22:36
|
||||
# DO NOT MODIFY THIS OR ANYTHING ABOVE! md5sum:iI0gmKqQxiPBTy5QsM6tpQ
|
||||
# Created by DBIx::Class::Schema::Loader v0.07049 @ 2020-07-27 18:21:03
|
||||
# DO NOT MODIFY THIS OR ANYTHING ABOVE! md5sum:O4R8b/GukNaUmmAErb3Jlw
|
||||
|
||||
my %hint = (
|
||||
columns => [
|
||||
|
@ -152,7 +145,6 @@ my %hint = (
|
|||
'subtype',
|
||||
'name',
|
||||
'filesize',
|
||||
'sha1hash',
|
||||
'sha256hash',
|
||||
'path',
|
||||
'defaultpath'
|
||||
|
|
|
@ -191,7 +191,6 @@
|
|||
<table class="info-table">
|
||||
[% INCLUDE renderProductLinks %]
|
||||
<tr><th>File size:</th><td>[% product.filesize %] bytes ([% mibs(product.filesize / (1024 * 1024)) %] MiB)</td></tr>
|
||||
<tr><th>SHA-1 hash:</th><td><tt>[% product.sha1hash %]</tt></td></tr>
|
||||
<tr><th>SHA-256 hash:</th><td><tt>[% product.sha256hash %]</tt></td></tr>
|
||||
<tr><th>Full path:</th><td><tt>[% product.path %]</tt></td></tr>
|
||||
</table>
|
||||
|
|
|
@ -330,7 +330,6 @@ create table BuildProducts (
|
|||
type text not null, -- "nix-build", "file", "doc", "report", ...
|
||||
subtype text not null, -- "source-dist", "rpm", ...
|
||||
fileSize bigint,
|
||||
sha1hash text,
|
||||
sha256hash text,
|
||||
path text,
|
||||
name text not null, -- generally just the filename part of `path'
|
||||
|
|
Loading…
Reference in a new issue