forked from lix-project/lix
libstore: use notifications for stats counters
updating statistics *immediately* when any counter changes declutters
things somewhat and makes useful status reports less dependent on the
current worker main loop. using callbacks will make it easier to move
the worker loop into kj entirely, using only promises for scheduling.
Change-Id: I695dfa83111b1ec09b1a54cff268f3c1d7743ed6
This commit is contained in:
parent
c2b90d235f
commit
e0fd0ba211
10 changed files with 169 additions and 42 deletions
|
@ -71,7 +71,7 @@ DerivationGoal::DerivationGoal(const StorePath & drvPath,
|
||||||
DerivedPath::Built { makeConstantStorePathRef(drvPath), wantedOutputs }.to_string(worker.store));
|
DerivedPath::Built { makeConstantStorePathRef(drvPath), wantedOutputs }.to_string(worker.store));
|
||||||
trace("created");
|
trace("created");
|
||||||
|
|
||||||
mcExpectedBuilds = std::make_unique<MaintainCount<uint64_t>>(worker.expectedBuilds);
|
mcExpectedBuilds = worker.expectedBuilds.addTemporarily(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -91,7 +91,7 @@ DerivationGoal::DerivationGoal(const StorePath & drvPath, const BasicDerivation
|
||||||
DerivedPath::Built { makeConstantStorePathRef(drvPath), drv.outputNames() }.to_string(worker.store));
|
DerivedPath::Built { makeConstantStorePathRef(drvPath), drv.outputNames() }.to_string(worker.store));
|
||||||
trace("created");
|
trace("created");
|
||||||
|
|
||||||
mcExpectedBuilds = std::make_unique<MaintainCount<uint64_t>>(worker.expectedBuilds);
|
mcExpectedBuilds = worker.expectedBuilds.addTemporarily(1);
|
||||||
|
|
||||||
/* Prevent the .chroot directory from being
|
/* Prevent the .chroot directory from being
|
||||||
garbage-collected. (See isActiveTempFile() in gc.cc.) */
|
garbage-collected. (See isActiveTempFile() in gc.cc.) */
|
||||||
|
@ -662,7 +662,7 @@ void DerivationGoal::started()
|
||||||
if (hook) msg += fmt(" on '%s'", machineName);
|
if (hook) msg += fmt(" on '%s'", machineName);
|
||||||
act = std::make_unique<Activity>(*logger, lvlInfo, actBuild, msg,
|
act = std::make_unique<Activity>(*logger, lvlInfo, actBuild, msg,
|
||||||
Logger::Fields{worker.store.printStorePath(drvPath), hook ? machineName : "", 1, 1});
|
Logger::Fields{worker.store.printStorePath(drvPath), hook ? machineName : "", 1, 1});
|
||||||
mcRunningBuilds = std::make_unique<MaintainCount<uint64_t>>(worker.runningBuilds);
|
mcRunningBuilds = worker.runningBuilds.addTemporarily(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
Goal::WorkResult DerivationGoal::tryToBuild(bool inBuildSlot)
|
Goal::WorkResult DerivationGoal::tryToBuild(bool inBuildSlot)
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
#pragma once
|
#pragma once
|
||||||
///@file
|
///@file
|
||||||
|
|
||||||
|
#include "notifying-counter.hh"
|
||||||
#include "parsed-derivations.hh"
|
#include "parsed-derivations.hh"
|
||||||
#include "lock.hh"
|
#include "lock.hh"
|
||||||
#include "outputs-spec.hh"
|
#include "outputs-spec.hh"
|
||||||
|
@ -217,7 +218,7 @@ struct DerivationGoal : public Goal
|
||||||
|
|
||||||
BuildMode buildMode;
|
BuildMode buildMode;
|
||||||
|
|
||||||
std::unique_ptr<MaintainCount<uint64_t>> mcExpectedBuilds, mcRunningBuilds;
|
NotifyingCounter<uint64_t>::Bump mcExpectedBuilds, mcRunningBuilds;
|
||||||
|
|
||||||
std::unique_ptr<Activity> act;
|
std::unique_ptr<Activity> act;
|
||||||
|
|
||||||
|
|
|
@ -42,8 +42,7 @@ Goal::WorkResult DrvOutputSubstitutionGoal::tryNext(bool inBuildSlot)
|
||||||
return WaitForSlot{};
|
return WaitForSlot{};
|
||||||
}
|
}
|
||||||
|
|
||||||
maintainRunningSubstitutions =
|
maintainRunningSubstitutions = worker.runningSubstitutions.addTemporarily(1);
|
||||||
std::make_unique<MaintainCount<uint64_t>>(worker.runningSubstitutions);
|
|
||||||
|
|
||||||
if (subs.size() == 0) {
|
if (subs.size() == 0) {
|
||||||
/* None left. Terminate this goal and let someone else deal
|
/* None left. Terminate this goal and let someone else deal
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
#pragma once
|
#pragma once
|
||||||
///@file
|
///@file
|
||||||
|
|
||||||
|
#include "notifying-counter.hh"
|
||||||
#include "store-api.hh"
|
#include "store-api.hh"
|
||||||
#include "goal.hh"
|
#include "goal.hh"
|
||||||
#include "realisation.hh"
|
#include "realisation.hh"
|
||||||
|
@ -40,7 +41,7 @@ class DrvOutputSubstitutionGoal : public Goal {
|
||||||
*/
|
*/
|
||||||
std::shared_ptr<Store> sub;
|
std::shared_ptr<Store> sub;
|
||||||
|
|
||||||
std::unique_ptr<MaintainCount<uint64_t>> maintainRunningSubstitutions;
|
NotifyingCounter<uint64_t>::Bump maintainRunningSubstitutions;
|
||||||
|
|
||||||
struct DownloadState
|
struct DownloadState
|
||||||
{
|
{
|
||||||
|
|
|
@ -21,7 +21,7 @@ PathSubstitutionGoal::PathSubstitutionGoal(
|
||||||
state = &PathSubstitutionGoal::init;
|
state = &PathSubstitutionGoal::init;
|
||||||
name = fmt("substitution of '%s'", worker.store.printStorePath(this->storePath));
|
name = fmt("substitution of '%s'", worker.store.printStorePath(this->storePath));
|
||||||
trace("created");
|
trace("created");
|
||||||
maintainExpectedSubstitutions = std::make_unique<MaintainCount<uint64_t>>(worker.expectedSubstitutions);
|
maintainExpectedSubstitutions = worker.expectedSubstitutions.addTemporarily(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -139,11 +139,11 @@ Goal::WorkResult PathSubstitutionGoal::tryNext(bool inBuildSlot)
|
||||||
/* Update the total expected download size. */
|
/* Update the total expected download size. */
|
||||||
auto narInfo = std::dynamic_pointer_cast<const NarInfo>(info);
|
auto narInfo = std::dynamic_pointer_cast<const NarInfo>(info);
|
||||||
|
|
||||||
maintainExpectedNar = std::make_unique<MaintainCount<uint64_t>>(worker.expectedNarSize, info->narSize);
|
maintainExpectedNar = worker.expectedNarSize.addTemporarily(info->narSize);
|
||||||
|
|
||||||
maintainExpectedDownload =
|
maintainExpectedDownload =
|
||||||
narInfo && narInfo->fileSize
|
narInfo && narInfo->fileSize
|
||||||
? std::make_unique<MaintainCount<uint64_t>>(worker.expectedDownloadSize, narInfo->fileSize)
|
? worker.expectedDownloadSize.addTemporarily(narInfo->fileSize)
|
||||||
: nullptr;
|
: nullptr;
|
||||||
|
|
||||||
/* Bail out early if this substituter lacks a valid
|
/* Bail out early if this substituter lacks a valid
|
||||||
|
@ -200,7 +200,7 @@ Goal::WorkResult PathSubstitutionGoal::tryToRun(bool inBuildSlot)
|
||||||
return WaitForSlot{};
|
return WaitForSlot{};
|
||||||
}
|
}
|
||||||
|
|
||||||
maintainRunningSubstitutions = std::make_unique<MaintainCount<uint64_t>>(worker.runningSubstitutions);
|
maintainRunningSubstitutions = worker.runningSubstitutions.addTemporarily(1);
|
||||||
|
|
||||||
outPipe.create();
|
outPipe.create();
|
||||||
|
|
||||||
|
@ -268,13 +268,10 @@ Goal::WorkResult PathSubstitutionGoal::finished(bool inBuildSlot)
|
||||||
maintainExpectedSubstitutions.reset();
|
maintainExpectedSubstitutions.reset();
|
||||||
worker.doneSubstitutions++;
|
worker.doneSubstitutions++;
|
||||||
|
|
||||||
if (maintainExpectedDownload) {
|
worker.doneDownloadSize += maintainExpectedDownload.delta();
|
||||||
auto fileSize = maintainExpectedDownload->delta;
|
maintainExpectedDownload.reset();
|
||||||
maintainExpectedDownload.reset();
|
|
||||||
worker.doneDownloadSize += fileSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
worker.doneNarSize += maintainExpectedNar->delta;
|
worker.doneNarSize += maintainExpectedNar.delta();
|
||||||
maintainExpectedNar.reset();
|
maintainExpectedNar.reset();
|
||||||
|
|
||||||
return done(ecSuccess, BuildResult::Substituted);
|
return done(ecSuccess, BuildResult::Substituted);
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
///@file
|
///@file
|
||||||
|
|
||||||
#include "lock.hh"
|
#include "lock.hh"
|
||||||
|
#include "notifying-counter.hh"
|
||||||
#include "store-api.hh"
|
#include "store-api.hh"
|
||||||
#include "goal.hh"
|
#include "goal.hh"
|
||||||
|
|
||||||
|
@ -63,7 +64,7 @@ struct PathSubstitutionGoal : public Goal
|
||||||
*/
|
*/
|
||||||
Path destPath;
|
Path destPath;
|
||||||
|
|
||||||
std::unique_ptr<MaintainCount<uint64_t>> maintainExpectedSubstitutions,
|
NotifyingCounter<uint64_t>::Bump maintainExpectedSubstitutions,
|
||||||
maintainRunningSubstitutions, maintainExpectedNar, maintainExpectedDownload;
|
maintainRunningSubstitutions, maintainExpectedNar, maintainExpectedDownload;
|
||||||
|
|
||||||
typedef WorkResult (PathSubstitutionGoal::*GoalState)(bool inBuildSlot);
|
typedef WorkResult (PathSubstitutionGoal::*GoalState)(bool inBuildSlot);
|
||||||
|
|
|
@ -320,6 +320,27 @@ void Worker::waitForAWhile(GoalPtr goal)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void Worker::updateStatistics()
|
||||||
|
{
|
||||||
|
// only update progress info while running. this notably excludes updating
|
||||||
|
// progress info while destroying, which causes the progress bar to assert
|
||||||
|
if (running && statisticsOutdated) {
|
||||||
|
actDerivations.progress(
|
||||||
|
doneBuilds, expectedBuilds + doneBuilds, runningBuilds, failedBuilds
|
||||||
|
);
|
||||||
|
actSubstitutions.progress(
|
||||||
|
doneSubstitutions,
|
||||||
|
expectedSubstitutions + doneSubstitutions,
|
||||||
|
runningSubstitutions,
|
||||||
|
failedSubstitutions
|
||||||
|
);
|
||||||
|
act.setExpected(actFileTransfer, expectedDownloadSize + doneDownloadSize);
|
||||||
|
act.setExpected(actCopyPath, expectedNarSize + doneNarSize);
|
||||||
|
|
||||||
|
statisticsOutdated = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Goals Worker::run(std::function<Goals (GoalFactory &)> req)
|
Goals Worker::run(std::function<Goals (GoalFactory &)> req)
|
||||||
{
|
{
|
||||||
auto _topGoals = req(goalFactory());
|
auto _topGoals = req(goalFactory());
|
||||||
|
@ -329,6 +350,8 @@ Goals Worker::run(std::function<Goals (GoalFactory &)> req)
|
||||||
running = true;
|
running = true;
|
||||||
Finally const _stop([&] { running = false; });
|
Finally const _stop([&] { running = false; });
|
||||||
|
|
||||||
|
updateStatistics();
|
||||||
|
|
||||||
for (auto & i : _topGoals) {
|
for (auto & i : _topGoals) {
|
||||||
topGoals.insert(i);
|
topGoals.insert(i);
|
||||||
if (auto goal = dynamic_cast<DerivationGoal *>(i.get())) {
|
if (auto goal = dynamic_cast<DerivationGoal *>(i.get())) {
|
||||||
|
@ -373,18 +396,7 @@ Goals Worker::run(std::function<Goals (GoalFactory &)> req)
|
||||||
? nrSubstitutions < std::max(1U, (unsigned int) settings.maxSubstitutionJobs)
|
? nrSubstitutions < std::max(1U, (unsigned int) settings.maxSubstitutionJobs)
|
||||||
: nrLocalBuilds < settings.maxBuildJobs;
|
: nrLocalBuilds < settings.maxBuildJobs;
|
||||||
handleWorkResult(goal, goal->work(inSlot));
|
handleWorkResult(goal, goal->work(inSlot));
|
||||||
|
updateStatistics();
|
||||||
actDerivations.progress(
|
|
||||||
doneBuilds, expectedBuilds + doneBuilds, runningBuilds, failedBuilds
|
|
||||||
);
|
|
||||||
actSubstitutions.progress(
|
|
||||||
doneSubstitutions,
|
|
||||||
expectedSubstitutions + doneSubstitutions,
|
|
||||||
runningSubstitutions,
|
|
||||||
failedSubstitutions
|
|
||||||
);
|
|
||||||
act.setExpected(actFileTransfer, expectedDownloadSize + doneDownloadSize);
|
|
||||||
act.setExpected(actCopyPath, expectedNarSize + doneNarSize);
|
|
||||||
|
|
||||||
if (topGoals.empty()) break; // stuff may have been cancelled
|
if (topGoals.empty()) break; // stuff may have been cancelled
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
#pragma once
|
#pragma once
|
||||||
///@file
|
///@file
|
||||||
|
|
||||||
|
#include "notifying-counter.hh"
|
||||||
#include "types.hh"
|
#include "types.hh"
|
||||||
#include "lock.hh"
|
#include "lock.hh"
|
||||||
#include "store-api.hh"
|
#include "store-api.hh"
|
||||||
|
@ -213,6 +214,21 @@ private:
|
||||||
void childStarted(GoalPtr goal, const std::set<int> & fds,
|
void childStarted(GoalPtr goal, const std::set<int> & fds,
|
||||||
bool inBuildSlot);
|
bool inBuildSlot);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pass current stats counters to the logger for progress bar updates.
|
||||||
|
*/
|
||||||
|
void updateStatistics();
|
||||||
|
|
||||||
|
bool statisticsOutdated = true;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Mark statistics as outdated, such that `updateStatistics` will be called.
|
||||||
|
*/
|
||||||
|
void updateStatisticsLater()
|
||||||
|
{
|
||||||
|
statisticsOutdated = true;
|
||||||
|
}
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
|
||||||
const Activity act;
|
const Activity act;
|
||||||
|
@ -234,19 +250,19 @@ public:
|
||||||
|
|
||||||
HookState hook;
|
HookState hook;
|
||||||
|
|
||||||
uint64_t expectedBuilds = 0;
|
NotifyingCounter<uint64_t> expectedBuilds{[this] { updateStatisticsLater(); }};
|
||||||
uint64_t doneBuilds = 0;
|
NotifyingCounter<uint64_t> doneBuilds{[this] { updateStatisticsLater(); }};
|
||||||
uint64_t failedBuilds = 0;
|
NotifyingCounter<uint64_t> failedBuilds{[this] { updateStatisticsLater(); }};
|
||||||
uint64_t runningBuilds = 0;
|
NotifyingCounter<uint64_t> runningBuilds{[this] { updateStatisticsLater(); }};
|
||||||
|
|
||||||
uint64_t expectedSubstitutions = 0;
|
NotifyingCounter<uint64_t> expectedSubstitutions{[this] { updateStatisticsLater(); }};
|
||||||
uint64_t doneSubstitutions = 0;
|
NotifyingCounter<uint64_t> doneSubstitutions{[this] { updateStatisticsLater(); }};
|
||||||
uint64_t failedSubstitutions = 0;
|
NotifyingCounter<uint64_t> failedSubstitutions{[this] { updateStatisticsLater(); }};
|
||||||
uint64_t runningSubstitutions = 0;
|
NotifyingCounter<uint64_t> runningSubstitutions{[this] { updateStatisticsLater(); }};
|
||||||
uint64_t expectedDownloadSize = 0;
|
NotifyingCounter<uint64_t> expectedDownloadSize{[this] { updateStatisticsLater(); }};
|
||||||
uint64_t doneDownloadSize = 0;
|
NotifyingCounter<uint64_t> doneDownloadSize{[this] { updateStatisticsLater(); }};
|
||||||
uint64_t expectedNarSize = 0;
|
NotifyingCounter<uint64_t> expectedNarSize{[this] { updateStatisticsLater(); }};
|
||||||
uint64_t doneNarSize = 0;
|
NotifyingCounter<uint64_t> doneNarSize{[this] { updateStatisticsLater(); }};
|
||||||
|
|
||||||
Worker(Store & store, Store & evalStore);
|
Worker(Store & store, Store & evalStore);
|
||||||
~Worker();
|
~Worker();
|
||||||
|
|
|
@ -95,6 +95,7 @@ libutil_headers = files(
|
||||||
'monitor-fd.hh',
|
'monitor-fd.hh',
|
||||||
'mount.hh',
|
'mount.hh',
|
||||||
'namespaces.hh',
|
'namespaces.hh',
|
||||||
|
'notifying-counter.hh',
|
||||||
'pool.hh',
|
'pool.hh',
|
||||||
'position.hh',
|
'position.hh',
|
||||||
'print-elided.hh',
|
'print-elided.hh',
|
||||||
|
|
99
src/libutil/notifying-counter.hh
Normal file
99
src/libutil/notifying-counter.hh
Normal file
|
@ -0,0 +1,99 @@
|
||||||
|
#pragma once
|
||||||
|
/// @file
|
||||||
|
|
||||||
|
#include <cassert>
|
||||||
|
#include <functional>
|
||||||
|
#include <memory>
|
||||||
|
|
||||||
|
namespace nix {
|
||||||
|
|
||||||
|
template<std::integral T>
|
||||||
|
class NotifyingCounter
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
T counter;
|
||||||
|
std::function<void()> notify;
|
||||||
|
|
||||||
|
public:
|
||||||
|
class Bump
|
||||||
|
{
|
||||||
|
friend class NotifyingCounter;
|
||||||
|
|
||||||
|
struct SubOnFree
|
||||||
|
{
|
||||||
|
T delta;
|
||||||
|
|
||||||
|
void operator()(NotifyingCounter * c) const
|
||||||
|
{
|
||||||
|
c->add(-delta);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// lightly misuse unique_ptr to get RAII types with destructor callbacks
|
||||||
|
std::unique_ptr<NotifyingCounter<T>, SubOnFree> at;
|
||||||
|
|
||||||
|
Bump(NotifyingCounter<T> & at, T delta) : at(&at, {delta}) {}
|
||||||
|
|
||||||
|
public:
|
||||||
|
Bump() = default;
|
||||||
|
Bump(decltype(nullptr)) {}
|
||||||
|
|
||||||
|
T delta() const
|
||||||
|
{
|
||||||
|
return at ? at.get_deleter().delta : 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void reset()
|
||||||
|
{
|
||||||
|
at.reset();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
explicit NotifyingCounter(std::function<void()> notify, T initial = 0)
|
||||||
|
: counter(initial)
|
||||||
|
, notify(std::move(notify))
|
||||||
|
{
|
||||||
|
assert(this->notify);
|
||||||
|
}
|
||||||
|
|
||||||
|
// bumps hold pointers to this, so we should neither copy nor move.
|
||||||
|
NotifyingCounter(const NotifyingCounter &) = delete;
|
||||||
|
NotifyingCounter & operator=(const NotifyingCounter &) = delete;
|
||||||
|
NotifyingCounter(NotifyingCounter &&) = delete;
|
||||||
|
NotifyingCounter & operator=(NotifyingCounter &&) = delete;
|
||||||
|
|
||||||
|
T get() const
|
||||||
|
{
|
||||||
|
return counter;
|
||||||
|
}
|
||||||
|
|
||||||
|
operator T() const
|
||||||
|
{
|
||||||
|
return counter;
|
||||||
|
}
|
||||||
|
|
||||||
|
void add(T delta)
|
||||||
|
{
|
||||||
|
counter += delta;
|
||||||
|
notify();
|
||||||
|
}
|
||||||
|
|
||||||
|
NotifyingCounter & operator+=(T delta)
|
||||||
|
{
|
||||||
|
add(delta);
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
NotifyingCounter & operator++(int)
|
||||||
|
{
|
||||||
|
return *this += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
Bump addTemporarily(T delta)
|
||||||
|
{
|
||||||
|
add(delta);
|
||||||
|
return Bump{*this, delta};
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in a new issue