Extract a generic computeClosure function

Move the `closure` logic of `computeFSClosure` to its own (templated) function.

This doesn’t bring much by itself (except for the ability to properly
test the “closure” functionality independently from the rest), but it
allows reusing it (in particular for the realisations which will require
a very similar closure computation)
This commit is contained in:
regnat 2021-05-18 14:30:32 +02:00
parent 7234cf39be
commit 184558834a
3 changed files with 193 additions and 79 deletions

View file

@ -6,98 +6,73 @@
#include "thread-pool.hh" #include "thread-pool.hh"
#include "topo-sort.hh" #include "topo-sort.hh"
#include "callback.hh" #include "callback.hh"
#include "closure.hh"
namespace nix { namespace nix {
void Store::computeFSClosure(const StorePathSet & startPaths, void Store::computeFSClosure(const StorePathSet & startPaths,
StorePathSet & paths_, bool flipDirection, bool includeOutputs, bool includeDerivers) StorePathSet & paths_, bool flipDirection, bool includeOutputs, bool includeDerivers)
{ {
struct State std::function<std::set<StorePath>(const StorePath & path, std::future<ref<const ValidPathInfo>> &)> queryDeps;
{ if (flipDirection)
size_t pending; queryDeps = [&](const StorePath& path,
StorePathSet & paths; std::future<ref<const ValidPathInfo>> & fut) {
std::exception_ptr exc; StorePathSet res;
};
Sync<State> state_(State{0, paths_, 0});
std::function<void(const StorePath &)> enqueue;
std::condition_variable done;
enqueue = [&](const StorePath & path) -> void {
{
auto state(state_.lock());
if (state->exc) return;
if (!state->paths.insert(path).second) return;
state->pending++;
}
queryPathInfo(path, {[&](std::future<ref<const ValidPathInfo>> fut) {
// FIXME: calls to isValidPath() should be async
try {
auto info = fut.get();
if (flipDirection) {
StorePathSet referrers; StorePathSet referrers;
queryReferrers(path, referrers); queryReferrers(path, referrers);
for (auto& ref : referrers) for (auto& ref : referrers)
if (ref != path) if (ref != path)
enqueue(ref); res.insert(ref);
if (includeOutputs) if (includeOutputs)
for (auto& i : queryValidDerivers(path)) for (auto& i : queryValidDerivers(path))
enqueue(i); res.insert(i);
if (includeDerivers && path.isDerivation()) if (includeDerivers && path.isDerivation())
for (auto& i : queryDerivationOutputs(path)) for (auto& i : queryDerivationOutputs(path))
if (isValidPath(i) && queryPathInfo(i)->deriver == path) if (isValidPath(i) && queryPathInfo(i)->deriver == path)
enqueue(i); res.insert(i);
return res;
} else { };
else
queryDeps = [&](const StorePath& path,
std::future<ref<const ValidPathInfo>> & fut) {
StorePathSet res;
auto info = fut.get();
for (auto& ref : info->references) for (auto& ref : info->references)
if (ref != path) if (ref != path)
enqueue(ref); res.insert(ref);
if (includeOutputs && path.isDerivation()) if (includeOutputs && path.isDerivation())
for (auto& i : queryDerivationOutputs(path)) for (auto& i : queryDerivationOutputs(path))
if (isValidPath(i)) enqueue(i); if (isValidPath(i))
res.insert(i);
if (includeDerivers && info->deriver && isValidPath(*info->deriver)) if (includeDerivers && info->deriver && isValidPath(*info->deriver))
enqueue(*info->deriver); res.insert(*info->deriver);
return res;
} };
{
auto state(state_.lock());
assert(state->pending);
if (!--state->pending) done.notify_one();
}
computeClosure<StorePath>(
startPaths, paths_,
[&](const StorePath& path,
std::function<void(std::promise<std::set<StorePath>>&)>
processEdges) {
std::promise<std::set<StorePath>> promise;
std::function<void(std::future<ref<const ValidPathInfo>>)>
getDependencies =
[&](std::future<ref<const ValidPathInfo>> fut) {
try {
promise.set_value(queryDeps(path, fut));
} catch (...) { } catch (...) {
auto state(state_.lock()); promise.set_exception(std::current_exception());
if (!state->exc) state->exc = std::current_exception();
assert(state->pending);
if (!--state->pending) done.notify_one();
};
}});
};
for (auto & startPath : startPaths)
enqueue(startPath);
{
auto state(state_.lock());
while (state->pending) state.wait(done);
if (state->exc) std::rethrow_exception(state->exc);
} }
};
queryPathInfo(path, getDependencies);
processEdges(promise);
});
} }
void Store::computeFSClosure(const StorePath & startPath, void Store::computeFSClosure(const StorePath & startPath,
StorePathSet & paths_, bool flipDirection, bool includeOutputs, bool includeDerivers) StorePathSet & paths_, bool flipDirection, bool includeOutputs, bool includeDerivers)
{ {

69
src/libutil/closure.hh Normal file
View file

@ -0,0 +1,69 @@
#include <set>
#include <future>
#include "sync.hh"
using std::set;
namespace nix {
template<typename T>
using GetEdgesAsync = std::function<void(const T &, std::function<void(std::promise<set<T>> &)>)>;
template<typename T>
void computeClosure(
const set<T> startElts,
set<T> & res,
GetEdgesAsync<T> getEdgesAsync
)
{
struct State
{
size_t pending;
set<T> & res;
std::exception_ptr exc;
};
Sync<State> state_(State{0, res, 0});
std::function<void(const T &)> enqueue;
std::condition_variable done;
enqueue = [&](const T & current) -> void {
{
auto state(state_.lock());
if (state->exc) return;
if (!state->res.insert(current).second) return;
state->pending++;
}
getEdgesAsync(current, [&](std::promise<set<T>> & prom) {
try {
auto children = prom.get_future().get();
for (auto & child : children)
enqueue(child);
{
auto state(state_.lock());
assert(state->pending);
if (!--state->pending) done.notify_one();
}
} catch (...) {
auto state(state_.lock());
if (!state->exc) state->exc = std::current_exception();
assert(state->pending);
if (!--state->pending) done.notify_one();
};
});
};
for (auto & startElt : startElts)
enqueue(startElt);
{
auto state(state_.lock());
while (state->pending) state.wait(done);
if (state->exc) std::rethrow_exception(state->exc);
}
}
}

View file

@ -0,0 +1,70 @@
#include "closure.hh"
#include <gtest/gtest.h>
namespace nix {
using namespace std;
map<string, set<string>> testGraph = {
{ "A", { "B", "C", "G" } },
{ "B", { "A" } }, // Loops back to A
{ "C", { "F" } }, // Indirect reference
{ "D", { "A" } }, // Not reachable, but has backreferences
{ "E", {} }, // Just not reachable
{ "F", {} },
{ "G", { "G" } }, // Self reference
};
TEST(closure, correctClosure) {
set<string> aClosure;
set<string> expectedClosure = {"A", "B", "C", "F", "G"};
computeClosure<string>(
{"A"},
aClosure,
[&](const string currentNode, function<void(promise<set<string>> &)> processEdges) {
promise<set<string>> promisedNodes;
promisedNodes.set_value(testGraph[currentNode]);
processEdges(promisedNodes);
}
);
ASSERT_EQ(aClosure, expectedClosure);
}
TEST(closure, properlyHandlesDirectExceptions) {
struct TestExn {};
set<string> aClosure;
EXPECT_THROW(
computeClosure<string>(
{"A"},
aClosure,
[&](const string currentNode, function<void(promise<set<string>> &)> processEdges) {
throw TestExn();
}
),
TestExn
);
}
TEST(closure, properlyHandlesExceptionsInPromise) {
struct TestExn {};
set<string> aClosure;
EXPECT_THROW(
computeClosure<string>(
{"A"},
aClosure,
[&](const string currentNode, function<void(promise<set<string>> &)> processEdges) {
promise<set<string>> promise;
try {
throw TestExn();
} catch (...) {
promise.set_exception(std::current_exception());
}
processEdges(promise);
}
),
TestExn
);
}
}