Merge pull request #4834 from NixOS/generic-closure-function

Extract a generic `computeClosure` function
This commit is contained in:
Eelco Dolstra 2021-05-19 13:31:30 +02:00 committed by GitHub
commit af4ff644d5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 193 additions and 79 deletions

View file

@ -6,98 +6,73 @@
#include "thread-pool.hh" #include "thread-pool.hh"
#include "topo-sort.hh" #include "topo-sort.hh"
#include "callback.hh" #include "callback.hh"
#include "closure.hh"
namespace nix { namespace nix {
void Store::computeFSClosure(const StorePathSet & startPaths, void Store::computeFSClosure(const StorePathSet & startPaths,
StorePathSet & paths_, bool flipDirection, bool includeOutputs, bool includeDerivers) StorePathSet & paths_, bool flipDirection, bool includeOutputs, bool includeDerivers)
{ {
struct State std::function<std::set<StorePath>(const StorePath & path, std::future<ref<const ValidPathInfo>> &)> queryDeps;
{ if (flipDirection)
size_t pending; queryDeps = [&](const StorePath& path,
StorePathSet & paths; std::future<ref<const ValidPathInfo>> & fut) {
std::exception_ptr exc; StorePathSet res;
};
Sync<State> state_(State{0, paths_, 0});
std::function<void(const StorePath &)> enqueue;
std::condition_variable done;
enqueue = [&](const StorePath & path) -> void {
{
auto state(state_.lock());
if (state->exc) return;
if (!state->paths.insert(path).second) return;
state->pending++;
}
queryPathInfo(path, {[&](std::future<ref<const ValidPathInfo>> fut) {
// FIXME: calls to isValidPath() should be async
try {
auto info = fut.get();
if (flipDirection) {
StorePathSet referrers; StorePathSet referrers;
queryReferrers(path, referrers); queryReferrers(path, referrers);
for (auto& ref : referrers) for (auto& ref : referrers)
if (ref != path) if (ref != path)
enqueue(ref); res.insert(ref);
if (includeOutputs) if (includeOutputs)
for (auto& i : queryValidDerivers(path)) for (auto& i : queryValidDerivers(path))
enqueue(i); res.insert(i);
if (includeDerivers && path.isDerivation()) if (includeDerivers && path.isDerivation())
for (auto& i : queryDerivationOutputs(path)) for (auto& i : queryDerivationOutputs(path))
if (isValidPath(i) && queryPathInfo(i)->deriver == path) if (isValidPath(i) && queryPathInfo(i)->deriver == path)
enqueue(i); res.insert(i);
return res;
} else { };
else
queryDeps = [&](const StorePath& path,
std::future<ref<const ValidPathInfo>> & fut) {
StorePathSet res;
auto info = fut.get();
for (auto& ref : info->references) for (auto& ref : info->references)
if (ref != path) if (ref != path)
enqueue(ref); res.insert(ref);
if (includeOutputs && path.isDerivation()) if (includeOutputs && path.isDerivation())
for (auto& i : queryDerivationOutputs(path)) for (auto& i : queryDerivationOutputs(path))
if (isValidPath(i)) enqueue(i); if (isValidPath(i))
res.insert(i);
if (includeDerivers && info->deriver && isValidPath(*info->deriver)) if (includeDerivers && info->deriver && isValidPath(*info->deriver))
enqueue(*info->deriver); res.insert(*info->deriver);
return res;
} };
{
auto state(state_.lock());
assert(state->pending);
if (!--state->pending) done.notify_one();
}
computeClosure<StorePath>(
startPaths, paths_,
[&](const StorePath& path,
std::function<void(std::promise<std::set<StorePath>>&)>
processEdges) {
std::promise<std::set<StorePath>> promise;
std::function<void(std::future<ref<const ValidPathInfo>>)>
getDependencies =
[&](std::future<ref<const ValidPathInfo>> fut) {
try {
promise.set_value(queryDeps(path, fut));
} catch (...) { } catch (...) {
auto state(state_.lock()); promise.set_exception(std::current_exception());
if (!state->exc) state->exc = std::current_exception();
assert(state->pending);
if (!--state->pending) done.notify_one();
};
}});
};
for (auto & startPath : startPaths)
enqueue(startPath);
{
auto state(state_.lock());
while (state->pending) state.wait(done);
if (state->exc) std::rethrow_exception(state->exc);
} }
};
queryPathInfo(path, getDependencies);
processEdges(promise);
});
} }
void Store::computeFSClosure(const StorePath & startPath, void Store::computeFSClosure(const StorePath & startPath,
StorePathSet & paths_, bool flipDirection, bool includeOutputs, bool includeDerivers) StorePathSet & paths_, bool flipDirection, bool includeOutputs, bool includeDerivers)
{ {

69
src/libutil/closure.hh Normal file
View file

@ -0,0 +1,69 @@
#include <set>
#include <future>
#include "sync.hh"
using std::set;
namespace nix {
template<typename T>
using GetEdgesAsync = std::function<void(const T &, std::function<void(std::promise<set<T>> &)>)>;
template<typename T>
void computeClosure(
const set<T> startElts,
set<T> & res,
GetEdgesAsync<T> getEdgesAsync
)
{
struct State
{
size_t pending;
set<T> & res;
std::exception_ptr exc;
};
Sync<State> state_(State{0, res, 0});
std::function<void(const T &)> enqueue;
std::condition_variable done;
enqueue = [&](const T & current) -> void {
{
auto state(state_.lock());
if (state->exc) return;
if (!state->res.insert(current).second) return;
state->pending++;
}
getEdgesAsync(current, [&](std::promise<set<T>> & prom) {
try {
auto children = prom.get_future().get();
for (auto & child : children)
enqueue(child);
{
auto state(state_.lock());
assert(state->pending);
if (!--state->pending) done.notify_one();
}
} catch (...) {
auto state(state_.lock());
if (!state->exc) state->exc = std::current_exception();
assert(state->pending);
if (!--state->pending) done.notify_one();
};
});
};
for (auto & startElt : startElts)
enqueue(startElt);
{
auto state(state_.lock());
while (state->pending) state.wait(done);
if (state->exc) std::rethrow_exception(state->exc);
}
}
}

View file

@ -0,0 +1,70 @@
#include "closure.hh"
#include <gtest/gtest.h>
namespace nix {
using namespace std;
map<string, set<string>> testGraph = {
{ "A", { "B", "C", "G" } },
{ "B", { "A" } }, // Loops back to A
{ "C", { "F" } }, // Indirect reference
{ "D", { "A" } }, // Not reachable, but has backreferences
{ "E", {} }, // Just not reachable
{ "F", {} },
{ "G", { "G" } }, // Self reference
};
TEST(closure, correctClosure) {
set<string> aClosure;
set<string> expectedClosure = {"A", "B", "C", "F", "G"};
computeClosure<string>(
{"A"},
aClosure,
[&](const string currentNode, function<void(promise<set<string>> &)> processEdges) {
promise<set<string>> promisedNodes;
promisedNodes.set_value(testGraph[currentNode]);
processEdges(promisedNodes);
}
);
ASSERT_EQ(aClosure, expectedClosure);
}
TEST(closure, properlyHandlesDirectExceptions) {
struct TestExn {};
set<string> aClosure;
EXPECT_THROW(
computeClosure<string>(
{"A"},
aClosure,
[&](const string currentNode, function<void(promise<set<string>> &)> processEdges) {
throw TestExn();
}
),
TestExn
);
}
TEST(closure, properlyHandlesExceptionsInPromise) {
struct TestExn {};
set<string> aClosure;
EXPECT_THROW(
computeClosure<string>(
{"A"},
aClosure,
[&](const string currentNode, function<void(promise<set<string>> &)> processEdges) {
promise<set<string>> promise;
try {
throw TestExn();
} catch (...) {
promise.set_exception(std::current_exception());
}
processEdges(promise);
}
),
TestExn
);
}
}