forked from lix-project/hydra
Get data needed by getBuildOutput() from the incoming NAR in a streaming fashion
This commit is contained in:
parent
d4e4be4fd1
commit
5b4df3ad5a
9 changed files with 177 additions and 40 deletions
|
@ -1,7 +1,8 @@
|
||||||
bin_PROGRAMS = hydra-queue-runner
|
bin_PROGRAMS = hydra-queue-runner
|
||||||
|
|
||||||
hydra_queue_runner_SOURCES = hydra-queue-runner.cc queue-monitor.cc dispatcher.cc \
|
hydra_queue_runner_SOURCES = hydra-queue-runner.cc queue-monitor.cc dispatcher.cc \
|
||||||
builder.cc build-result.cc build-remote.cc \
|
builder.cc build-result.cc build-remote.cc \
|
||||||
build-result.hh counter.hh state.hh db.hh
|
build-result.hh counter.hh state.hh db.hh \
|
||||||
|
nar-extractor.cc nar-extractor.hh
|
||||||
hydra_queue_runner_LDADD = $(NIX_LIBS) -lpqxx
|
hydra_queue_runner_LDADD = $(NIX_LIBS) -lpqxx
|
||||||
hydra_queue_runner_CXXFLAGS = $(NIX_CFLAGS) -Wall -I ../libhydra -Wno-deprecated-declarations
|
hydra_queue_runner_CXXFLAGS = $(NIX_CFLAGS) -Wall -I ../libhydra -Wno-deprecated-declarations
|
||||||
|
|
|
@ -160,7 +160,8 @@ void State::buildRemote(ref<Store> destStore,
|
||||||
Machine::ptr machine, Step::ptr step,
|
Machine::ptr machine, Step::ptr step,
|
||||||
unsigned int maxSilentTime, unsigned int buildTimeout, unsigned int repeats,
|
unsigned int maxSilentTime, unsigned int buildTimeout, unsigned int repeats,
|
||||||
RemoteResult & result, std::shared_ptr<ActiveStep> activeStep,
|
RemoteResult & result, std::shared_ptr<ActiveStep> activeStep,
|
||||||
std::function<void(StepState)> updateStep)
|
std::function<void(StepState)> updateStep,
|
||||||
|
NarMemberDatas & narMembers)
|
||||||
{
|
{
|
||||||
assert(BuildResult::TimedOut == 8);
|
assert(BuildResult::TimedOut == 8);
|
||||||
|
|
||||||
|
@ -427,8 +428,6 @@ void State::buildRemote(ref<Store> destStore,
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Copy the output paths. */
|
/* Copy the output paths. */
|
||||||
result.accessor = destStore->getFSAccessor();
|
|
||||||
|
|
||||||
if (!machine->isLocalhost() || localStore != std::shared_ptr<Store>(destStore)) {
|
if (!machine->isLocalhost() || localStore != std::shared_ptr<Store>(destStore)) {
|
||||||
updateStep(ssReceivingOutputs);
|
updateStep(ssReceivingOutputs);
|
||||||
|
|
||||||
|
@ -475,7 +474,17 @@ void State::buildRemote(ref<Store> destStore,
|
||||||
auto & info = infos.find(path)->second;
|
auto & info = infos.find(path)->second;
|
||||||
to << cmdDumpStorePath << localStore->printStorePath(path);
|
to << cmdDumpStorePath << localStore->printStorePath(path);
|
||||||
to.flush();
|
to.flush();
|
||||||
destStore->addToStore(info, from);
|
|
||||||
|
/* Receive the NAR from the remote and add it to the
|
||||||
|
destination store. Meanwhile, extract all the info from the
|
||||||
|
NAR that getBuildOutput() needs. */
|
||||||
|
auto source2 = sinkToSource([&](Sink & sink)
|
||||||
|
{
|
||||||
|
TeeSource tee(from, sink);
|
||||||
|
extractNarData(tee, localStore->printStorePath(path), narMembers);
|
||||||
|
});
|
||||||
|
|
||||||
|
destStore->addToStore(info, *source2);
|
||||||
}
|
}
|
||||||
|
|
||||||
auto now2 = std::chrono::steady_clock::now();
|
auto now2 = std::chrono::steady_clock::now();
|
||||||
|
|
|
@ -8,8 +8,10 @@
|
||||||
using namespace nix;
|
using namespace nix;
|
||||||
|
|
||||||
|
|
||||||
BuildOutput getBuildOutput(nix::ref<Store> store,
|
BuildOutput getBuildOutput(
|
||||||
nix::ref<nix::FSAccessor> accessor, const Derivation & drv)
|
nix::ref<Store> store,
|
||||||
|
NarMemberDatas & narMembers,
|
||||||
|
const Derivation & drv)
|
||||||
{
|
{
|
||||||
BuildOutput res;
|
BuildOutput res;
|
||||||
|
|
||||||
|
@ -24,6 +26,20 @@ BuildOutput getBuildOutput(nix::ref<Store> store,
|
||||||
if (outputs.count(path)) res.size += info->narSize;
|
if (outputs.count(path)) res.size += info->narSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Fetch missing data. Usually buildRemote() will have extracted
|
||||||
|
this data from the incoming NARs. */
|
||||||
|
for (auto & output : outputs) {
|
||||||
|
auto outputS = store->printStorePath(output);
|
||||||
|
if (!narMembers.count(outputS)) {
|
||||||
|
printInfo("fetching NAR contents of '%s'...", outputS);
|
||||||
|
auto source = sinkToSource([&](Sink & sink)
|
||||||
|
{
|
||||||
|
store->narFromPath(output, sink);
|
||||||
|
});
|
||||||
|
extractNarData(*source, outputS, narMembers);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Get build products. */
|
/* Get build products. */
|
||||||
bool explicitProducts = false;
|
bool explicitProducts = false;
|
||||||
|
|
||||||
|
@ -39,17 +55,18 @@ BuildOutput getBuildOutput(nix::ref<Store> store,
|
||||||
for (auto & output : outputs) {
|
for (auto & output : outputs) {
|
||||||
auto outputS = store->printStorePath(output);
|
auto outputS = store->printStorePath(output);
|
||||||
|
|
||||||
Path failedFile = outputS + "/nix-support/failed";
|
if (narMembers.count(outputS + "/nix-support/failed"))
|
||||||
if (accessor->stat(failedFile).type == FSAccessor::Type::tRegular)
|
|
||||||
res.failed = true;
|
res.failed = true;
|
||||||
|
|
||||||
Path productsFile = outputS + "/nix-support/hydra-build-products";
|
auto productsFile = narMembers.find(outputS + "/nix-support/hydra-build-products");
|
||||||
if (accessor->stat(productsFile).type != FSAccessor::Type::tRegular)
|
if (productsFile == narMembers.end() ||
|
||||||
|
productsFile->second.type != FSAccessor::Type::tRegular)
|
||||||
continue;
|
continue;
|
||||||
|
assert(productsFile->second.contents);
|
||||||
|
|
||||||
explicitProducts = true;
|
explicitProducts = true;
|
||||||
|
|
||||||
for (auto & line : tokenizeString<Strings>(accessor->readFile(productsFile), "\n")) {
|
for (auto & line : tokenizeString<Strings>(productsFile->second.contents.value(), "\n")) {
|
||||||
BuildProduct product;
|
BuildProduct product;
|
||||||
|
|
||||||
std::smatch match;
|
std::smatch match;
|
||||||
|
@ -69,16 +86,15 @@ BuildOutput getBuildOutput(nix::ref<Store> store,
|
||||||
product.path = canonPath(product.path);
|
product.path = canonPath(product.path);
|
||||||
if (!store->isInStore(product.path)) continue;
|
if (!store->isInStore(product.path)) continue;
|
||||||
|
|
||||||
auto st = accessor->stat(product.path);
|
auto file = narMembers.find(product.path);
|
||||||
if (st.type == FSAccessor::Type::tMissing) continue;
|
if (file == narMembers.end()) continue;
|
||||||
|
|
||||||
product.name = product.path == store->printStorePath(output) ? "" : baseNameOf(product.path);
|
product.name = product.path == store->printStorePath(output) ? "" : baseNameOf(product.path);
|
||||||
|
|
||||||
if (st.type == FSAccessor::Type::tRegular) {
|
if (file->second.type == FSAccessor::Type::tRegular) {
|
||||||
product.isRegular = true;
|
product.isRegular = true;
|
||||||
product.fileSize = st.fileSize;
|
product.fileSize = file->second.fileSize.value();
|
||||||
auto contents = accessor->readFile(product.path);
|
product.sha256hash = file->second.sha256.value();
|
||||||
product.sha256hash = hashString(htSHA256, contents);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
res.products.push_back(product);
|
res.products.push_back(product);
|
||||||
|
@ -95,29 +111,30 @@ BuildOutput getBuildOutput(nix::ref<Store> store,
|
||||||
product.subtype = output.first == "out" ? "" : output.first;
|
product.subtype = output.first == "out" ? "" : output.first;
|
||||||
product.name = output.second.path.name();
|
product.name = output.second.path.name();
|
||||||
|
|
||||||
auto st = accessor->stat(product.path);
|
auto file = narMembers.find(product.path);
|
||||||
if (st.type == FSAccessor::Type::tMissing)
|
assert(file != narMembers.end());
|
||||||
throw Error("getting status of ‘%s’", product.path);
|
if (file->second.type == FSAccessor::Type::tDirectory)
|
||||||
if (st.type == FSAccessor::Type::tDirectory)
|
|
||||||
res.products.push_back(product);
|
res.products.push_back(product);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Get the release name from $output/nix-support/hydra-release-name. */
|
/* Get the release name from $output/nix-support/hydra-release-name. */
|
||||||
for (auto & output : outputs) {
|
for (auto & output : outputs) {
|
||||||
auto p = store->printStorePath(output) + "/nix-support/hydra-release-name";
|
auto file = narMembers.find(store->printStorePath(output) + "/nix-support/hydra-release-name");
|
||||||
if (accessor->stat(p).type != FSAccessor::Type::tRegular) continue;
|
if (file == narMembers.end() ||
|
||||||
try {
|
file->second.type != FSAccessor::Type::tRegular)
|
||||||
res.releaseName = trim(accessor->readFile(p));
|
continue;
|
||||||
} catch (Error & e) { continue; }
|
res.releaseName = trim(file->second.contents.value());
|
||||||
// FIXME: validate release name
|
// FIXME: validate release name
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Get metrics. */
|
/* Get metrics. */
|
||||||
for (auto & output : outputs) {
|
for (auto & output : outputs) {
|
||||||
auto metricsFile = store->printStorePath(output) + "/nix-support/hydra-metrics";
|
auto file = narMembers.find(store->printStorePath(output) + "/nix-support/hydra-metrics");
|
||||||
if (accessor->stat(metricsFile).type != FSAccessor::Type::tRegular) continue;
|
if (file == narMembers.end() ||
|
||||||
for (auto & line : tokenizeString<Strings>(accessor->readFile(metricsFile), "\n")) {
|
file->second.type != FSAccessor::Type::tRegular)
|
||||||
|
continue;
|
||||||
|
for (auto & line : tokenizeString<Strings>(file->second.contents.value(), "\n")) {
|
||||||
auto fields = tokenizeString<std::vector<std::string>>(line);
|
auto fields = tokenizeString<std::vector<std::string>>(line);
|
||||||
if (fields.size() < 2) continue;
|
if (fields.size() < 2) continue;
|
||||||
BuildMetric metric;
|
BuildMetric metric;
|
||||||
|
|
|
@ -5,6 +5,7 @@
|
||||||
#include "hash.hh"
|
#include "hash.hh"
|
||||||
#include "derivations.hh"
|
#include "derivations.hh"
|
||||||
#include "store-api.hh"
|
#include "store-api.hh"
|
||||||
|
#include "nar-extractor.hh"
|
||||||
|
|
||||||
struct BuildProduct
|
struct BuildProduct
|
||||||
{
|
{
|
||||||
|
@ -38,5 +39,7 @@ struct BuildOutput
|
||||||
std::map<std::string, BuildMetric> metrics;
|
std::map<std::string, BuildMetric> metrics;
|
||||||
};
|
};
|
||||||
|
|
||||||
BuildOutput getBuildOutput(nix::ref<nix::Store> store,
|
BuildOutput getBuildOutput(
|
||||||
nix::ref<nix::FSAccessor> accessor, const nix::Derivation & drv);
|
nix::ref<nix::Store> store,
|
||||||
|
NarMemberDatas & narMembers,
|
||||||
|
const nix::Derivation & drv);
|
||||||
|
|
|
@ -201,9 +201,11 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
|
||||||
};
|
};
|
||||||
|
|
||||||
/* Do the build. */
|
/* Do the build. */
|
||||||
|
NarMemberDatas narMembers;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
/* FIXME: referring builds may have conflicting timeouts. */
|
/* FIXME: referring builds may have conflicting timeouts. */
|
||||||
buildRemote(destStore, machine, step, maxSilentTime, buildTimeout, repeats, result, activeStep, updateStep);
|
buildRemote(destStore, machine, step, maxSilentTime, buildTimeout, repeats, result, activeStep, updateStep, narMembers);
|
||||||
} catch (Error & e) {
|
} catch (Error & e) {
|
||||||
if (activeStep->state_.lock()->cancelled) {
|
if (activeStep->state_.lock()->cancelled) {
|
||||||
printInfo("marking step %d of build %d as cancelled", stepNr, buildId);
|
printInfo("marking step %d of build %d as cancelled", stepNr, buildId);
|
||||||
|
@ -218,10 +220,8 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
|
||||||
|
|
||||||
if (result.stepStatus == bsSuccess) {
|
if (result.stepStatus == bsSuccess) {
|
||||||
updateStep(ssPostProcessing);
|
updateStep(ssPostProcessing);
|
||||||
res = getBuildOutput(destStore, ref<FSAccessor>(result.accessor), *step->drv);
|
res = getBuildOutput(destStore, narMembers, *step->drv);
|
||||||
}
|
}
|
||||||
|
|
||||||
result.accessor = 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
time_t stepStopTime = time(0);
|
time_t stepStopTime = time(0);
|
||||||
|
|
82
src/hydra-queue-runner/nar-extractor.cc
Normal file
82
src/hydra-queue-runner/nar-extractor.cc
Normal file
|
@ -0,0 +1,82 @@
|
||||||
|
#include "nar-extractor.hh"
|
||||||
|
|
||||||
|
#include "archive.hh"
|
||||||
|
|
||||||
|
#include <unordered_set>
|
||||||
|
|
||||||
|
using namespace nix;
|
||||||
|
|
||||||
|
struct Extractor : ParseSink
|
||||||
|
{
|
||||||
|
std::unordered_set<Path> filesToKeep {
|
||||||
|
"/nix-support/hydra-build-products",
|
||||||
|
"/nix-support/hydra-release-name",
|
||||||
|
"/nix-support/hydra-metrics",
|
||||||
|
};
|
||||||
|
|
||||||
|
NarMemberDatas & members;
|
||||||
|
NarMemberData * curMember = nullptr;
|
||||||
|
Path prefix;
|
||||||
|
|
||||||
|
Extractor(NarMemberDatas & members, const Path & prefix)
|
||||||
|
: members(members), prefix(prefix)
|
||||||
|
{ }
|
||||||
|
|
||||||
|
void createDirectory(const Path & path) override
|
||||||
|
{
|
||||||
|
members.insert_or_assign(prefix + path, NarMemberData { .type = FSAccessor::Type::tDirectory });
|
||||||
|
}
|
||||||
|
|
||||||
|
void createRegularFile(const Path & path) override
|
||||||
|
{
|
||||||
|
curMember = &members.insert_or_assign(prefix + path, NarMemberData {
|
||||||
|
.type = FSAccessor::Type::tRegular,
|
||||||
|
.fileSize = 0,
|
||||||
|
.contents = filesToKeep.count(path) ? std::optional("") : std::nullopt,
|
||||||
|
}).first->second;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::optional<unsigned long long> expectedSize;
|
||||||
|
std::unique_ptr<HashSink> hashSink;
|
||||||
|
|
||||||
|
void preallocateContents(unsigned long long size) override
|
||||||
|
{
|
||||||
|
expectedSize = size;
|
||||||
|
hashSink = std::make_unique<HashSink>(htSHA256);
|
||||||
|
}
|
||||||
|
|
||||||
|
void receiveContents(unsigned char * data, unsigned int len) override
|
||||||
|
{
|
||||||
|
assert(expectedSize);
|
||||||
|
assert(curMember);
|
||||||
|
assert(hashSink);
|
||||||
|
*curMember->fileSize += len;
|
||||||
|
(*hashSink)(data, len);
|
||||||
|
if (curMember->contents) {
|
||||||
|
curMember->contents->append((char *) data, len);
|
||||||
|
}
|
||||||
|
assert(curMember->fileSize <= expectedSize);
|
||||||
|
if (curMember->fileSize == expectedSize) {
|
||||||
|
auto [hash, len] = hashSink->finish();
|
||||||
|
assert(curMember->fileSize == len);
|
||||||
|
curMember->sha256 = hash;
|
||||||
|
hashSink.reset();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void createSymlink(const Path & path, const string & target) override
|
||||||
|
{
|
||||||
|
members.insert_or_assign(prefix + path, NarMemberData { .type = FSAccessor::Type::tSymlink });
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
void extractNarData(
|
||||||
|
Source & source,
|
||||||
|
const Path & prefix,
|
||||||
|
NarMemberDatas & members)
|
||||||
|
{
|
||||||
|
Extractor extractor(members, prefix);
|
||||||
|
parseDump(extractor, source);
|
||||||
|
// Note: this point may not be reached if we're in a coroutine.
|
||||||
|
}
|
23
src/hydra-queue-runner/nar-extractor.hh
Normal file
23
src/hydra-queue-runner/nar-extractor.hh
Normal file
|
@ -0,0 +1,23 @@
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include "fs-accessor.hh"
|
||||||
|
#include "types.hh"
|
||||||
|
#include "serialise.hh"
|
||||||
|
#include "hash.hh"
|
||||||
|
|
||||||
|
struct NarMemberData
|
||||||
|
{
|
||||||
|
nix::FSAccessor::Type type;
|
||||||
|
std::optional<unsigned long long> fileSize;
|
||||||
|
std::optional<std::string> contents;
|
||||||
|
std::optional<nix::Hash> sha256;
|
||||||
|
};
|
||||||
|
|
||||||
|
typedef std::map<nix::Path, NarMemberData> NarMemberDatas;
|
||||||
|
|
||||||
|
/* Read a NAR from a source and get to some info about every file
|
||||||
|
inside the NAR. */
|
||||||
|
void extractNarData(
|
||||||
|
nix::Source & source,
|
||||||
|
const nix::Path & prefix,
|
||||||
|
NarMemberDatas & members);
|
|
@ -672,5 +672,6 @@ BuildOutput State::getBuildOutputCached(Connection & conn, nix::ref<nix::Store>
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return getBuildOutput(destStore, destStore->getFSAccessor(), drv);
|
NarMemberDatas narMembers;
|
||||||
|
return getBuildOutput(destStore, narMembers, drv);
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
#include "pool.hh"
|
#include "pool.hh"
|
||||||
#include "store-api.hh"
|
#include "store-api.hh"
|
||||||
#include "sync.hh"
|
#include "sync.hh"
|
||||||
|
#include "nar-extractor.hh"
|
||||||
|
|
||||||
|
|
||||||
typedef unsigned int BuildID;
|
typedef unsigned int BuildID;
|
||||||
|
@ -64,7 +65,6 @@ struct RemoteResult
|
||||||
time_t startTime = 0, stopTime = 0;
|
time_t startTime = 0, stopTime = 0;
|
||||||
unsigned int overhead = 0;
|
unsigned int overhead = 0;
|
||||||
nix::Path logFile;
|
nix::Path logFile;
|
||||||
std::shared_ptr<nix::FSAccessor> accessor;
|
|
||||||
|
|
||||||
BuildStatus buildStatus() const
|
BuildStatus buildStatus() const
|
||||||
{
|
{
|
||||||
|
@ -518,7 +518,8 @@ private:
|
||||||
unsigned int maxSilentTime, unsigned int buildTimeout,
|
unsigned int maxSilentTime, unsigned int buildTimeout,
|
||||||
unsigned int repeats,
|
unsigned int repeats,
|
||||||
RemoteResult & result, std::shared_ptr<ActiveStep> activeStep,
|
RemoteResult & result, std::shared_ptr<ActiveStep> activeStep,
|
||||||
std::function<void(StepState)> updateStep);
|
std::function<void(StepState)> updateStep,
|
||||||
|
NarMemberDatas & narMembers);
|
||||||
|
|
||||||
void markSucceededBuild(pqxx::work & txn, Build::ptr build,
|
void markSucceededBuild(pqxx::work & txn, Build::ptr build,
|
||||||
const BuildOutput & res, bool isCachedBuild, time_t startTime, time_t stopTime);
|
const BuildOutput & res, bool isCachedBuild, time_t startTime, time_t stopTime);
|
||||||
|
|
Loading…
Reference in a new issue