From 184558834a475955e6c5d4b38745544c2c2907a3 Mon Sep 17 00:00:00 2001 From: regnat Date: Tue, 18 May 2021 14:30:32 +0200 Subject: [PATCH] Extract a generic `computeClosure` function MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move the `closure` logic of `computeFSClosure` to its own (templated) function. This doesn’t bring much by itself (except for the ability to properly test the “closure” functionality independently from the rest), but it allows reusing it (in particular for the realisations which will require a very similar closure computation) --- src/libstore/misc.cc | 133 ++++++++++++++--------------------- src/libutil/closure.hh | 69 ++++++++++++++++++ src/libutil/tests/closure.cc | 70 ++++++++++++++++++ 3 files changed, 193 insertions(+), 79 deletions(-) create mode 100644 src/libutil/closure.hh create mode 100644 src/libutil/tests/closure.cc diff --git a/src/libstore/misc.cc b/src/libstore/misc.cc index a99a2fc78..bc5fd968c 100644 --- a/src/libstore/misc.cc +++ b/src/libstore/misc.cc @@ -6,98 +6,73 @@ #include "thread-pool.hh" #include "topo-sort.hh" #include "callback.hh" +#include "closure.hh" namespace nix { - void Store::computeFSClosure(const StorePathSet & startPaths, StorePathSet & paths_, bool flipDirection, bool includeOutputs, bool includeDerivers) { - struct State - { - size_t pending; - StorePathSet & paths; - std::exception_ptr exc; - }; + std::function(const StorePath & path, std::future> &)> queryDeps; + if (flipDirection) + queryDeps = [&](const StorePath& path, + std::future> & fut) { + StorePathSet res; + StorePathSet referrers; + queryReferrers(path, referrers); + for (auto& ref : referrers) + if (ref != path) + res.insert(ref); - Sync state_(State{0, paths_, 0}); + if (includeOutputs) + for (auto& i : queryValidDerivers(path)) + res.insert(i); - std::function enqueue; + if (includeDerivers && path.isDerivation()) + for (auto& i : queryDerivationOutputs(path)) + if (isValidPath(i) && queryPathInfo(i)->deriver == path) + res.insert(i); + return res; + }; + else + queryDeps = [&](const StorePath& path, + std::future> & fut) { + StorePathSet res; + auto info = fut.get(); + for (auto& ref : info->references) + if (ref != path) + res.insert(ref); - std::condition_variable done; + if (includeOutputs && path.isDerivation()) + for (auto& i : queryDerivationOutputs(path)) + if (isValidPath(i)) + res.insert(i); - enqueue = [&](const StorePath & path) -> void { - { - auto state(state_.lock()); - if (state->exc) return; - if (!state->paths.insert(path).second) return; - state->pending++; - } + if (includeDerivers && info->deriver && isValidPath(*info->deriver)) + res.insert(*info->deriver); + return res; + }; - queryPathInfo(path, {[&](std::future> fut) { - // FIXME: calls to isValidPath() should be async - - try { - auto info = fut.get(); - - if (flipDirection) { - - StorePathSet referrers; - queryReferrers(path, referrers); - for (auto & ref : referrers) - if (ref != path) - enqueue(ref); - - if (includeOutputs) - for (auto & i : queryValidDerivers(path)) - enqueue(i); - - if (includeDerivers && path.isDerivation()) - for (auto & i : queryDerivationOutputs(path)) - if (isValidPath(i) && queryPathInfo(i)->deriver == path) - enqueue(i); - - } else { - - for (auto & ref : info->references) - if (ref != path) - enqueue(ref); - - if (includeOutputs && path.isDerivation()) - for (auto & i : queryDerivationOutputs(path)) - if (isValidPath(i)) enqueue(i); - - if (includeDerivers && info->deriver && isValidPath(*info->deriver)) - enqueue(*info->deriver); - - } - - { - auto state(state_.lock()); - assert(state->pending); - if (!--state->pending) done.notify_one(); - } - - } catch (...) { - auto state(state_.lock()); - if (!state->exc) state->exc = std::current_exception(); - assert(state->pending); - if (!--state->pending) done.notify_one(); - }; - }}); - }; - - for (auto & startPath : startPaths) - enqueue(startPath); - - { - auto state(state_.lock()); - while (state->pending) state.wait(done); - if (state->exc) std::rethrow_exception(state->exc); - } + computeClosure( + startPaths, paths_, + [&](const StorePath& path, + std::function>&)> + processEdges) { + std::promise> promise; + std::function>)> + getDependencies = + [&](std::future> fut) { + try { + promise.set_value(queryDeps(path, fut)); + } catch (...) { + promise.set_exception(std::current_exception()); + } + }; + queryPathInfo(path, getDependencies); + processEdges(promise); + }); } - void Store::computeFSClosure(const StorePath & startPath, StorePathSet & paths_, bool flipDirection, bool includeOutputs, bool includeDerivers) { diff --git a/src/libutil/closure.hh b/src/libutil/closure.hh new file mode 100644 index 000000000..779b9b2d5 --- /dev/null +++ b/src/libutil/closure.hh @@ -0,0 +1,69 @@ +#include +#include +#include "sync.hh" + +using std::set; + +namespace nix { + +template +using GetEdgesAsync = std::function> &)>)>; + +template +void computeClosure( + const set startElts, + set & res, + GetEdgesAsync getEdgesAsync +) +{ + struct State + { + size_t pending; + set & res; + std::exception_ptr exc; + }; + + Sync state_(State{0, res, 0}); + + std::function enqueue; + + std::condition_variable done; + + enqueue = [&](const T & current) -> void { + { + auto state(state_.lock()); + if (state->exc) return; + if (!state->res.insert(current).second) return; + state->pending++; + } + + getEdgesAsync(current, [&](std::promise> & prom) { + try { + auto children = prom.get_future().get(); + for (auto & child : children) + enqueue(child); + { + auto state(state_.lock()); + assert(state->pending); + if (!--state->pending) done.notify_one(); + } + } catch (...) { + auto state(state_.lock()); + if (!state->exc) state->exc = std::current_exception(); + assert(state->pending); + if (!--state->pending) done.notify_one(); + }; + }); + }; + + for (auto & startElt : startElts) + enqueue(startElt); + + { + auto state(state_.lock()); + while (state->pending) state.wait(done); + if (state->exc) std::rethrow_exception(state->exc); + } +} + +} diff --git a/src/libutil/tests/closure.cc b/src/libutil/tests/closure.cc new file mode 100644 index 000000000..7597e7807 --- /dev/null +++ b/src/libutil/tests/closure.cc @@ -0,0 +1,70 @@ +#include "closure.hh" +#include + +namespace nix { + +using namespace std; + +map> testGraph = { + { "A", { "B", "C", "G" } }, + { "B", { "A" } }, // Loops back to A + { "C", { "F" } }, // Indirect reference + { "D", { "A" } }, // Not reachable, but has backreferences + { "E", {} }, // Just not reachable + { "F", {} }, + { "G", { "G" } }, // Self reference +}; + +TEST(closure, correctClosure) { + set aClosure; + set expectedClosure = {"A", "B", "C", "F", "G"}; + computeClosure( + {"A"}, + aClosure, + [&](const string currentNode, function> &)> processEdges) { + promise> promisedNodes; + promisedNodes.set_value(testGraph[currentNode]); + processEdges(promisedNodes); + } + ); + + ASSERT_EQ(aClosure, expectedClosure); +} + +TEST(closure, properlyHandlesDirectExceptions) { + struct TestExn {}; + set aClosure; + EXPECT_THROW( + computeClosure( + {"A"}, + aClosure, + [&](const string currentNode, function> &)> processEdges) { + throw TestExn(); + } + ), + TestExn + ); +} + +TEST(closure, properlyHandlesExceptionsInPromise) { + struct TestExn {}; + set aClosure; + EXPECT_THROW( + computeClosure( + {"A"}, + aClosure, + [&](const string currentNode, function> &)> processEdges) { + promise> promise; + try { + throw TestExn(); + } catch (...) { + promise.set_exception(std::current_exception()); + } + processEdges(promise); + } + ), + TestExn + ); +} + +}