diff --git a/src/libstore/misc.cc b/src/libstore/misc.cc index a99a2fc78..bc5fd968c 100644 --- a/src/libstore/misc.cc +++ b/src/libstore/misc.cc @@ -6,98 +6,73 @@ #include "thread-pool.hh" #include "topo-sort.hh" #include "callback.hh" +#include "closure.hh" namespace nix { - void Store::computeFSClosure(const StorePathSet & startPaths, StorePathSet & paths_, bool flipDirection, bool includeOutputs, bool includeDerivers) { - struct State - { - size_t pending; - StorePathSet & paths; - std::exception_ptr exc; - }; + std::function(const StorePath & path, std::future> &)> queryDeps; + if (flipDirection) + queryDeps = [&](const StorePath& path, + std::future> & fut) { + StorePathSet res; + StorePathSet referrers; + queryReferrers(path, referrers); + for (auto& ref : referrers) + if (ref != path) + res.insert(ref); - Sync state_(State{0, paths_, 0}); + if (includeOutputs) + for (auto& i : queryValidDerivers(path)) + res.insert(i); - std::function enqueue; + if (includeDerivers && path.isDerivation()) + for (auto& i : queryDerivationOutputs(path)) + if (isValidPath(i) && queryPathInfo(i)->deriver == path) + res.insert(i); + return res; + }; + else + queryDeps = [&](const StorePath& path, + std::future> & fut) { + StorePathSet res; + auto info = fut.get(); + for (auto& ref : info->references) + if (ref != path) + res.insert(ref); - std::condition_variable done; + if (includeOutputs && path.isDerivation()) + for (auto& i : queryDerivationOutputs(path)) + if (isValidPath(i)) + res.insert(i); - enqueue = [&](const StorePath & path) -> void { - { - auto state(state_.lock()); - if (state->exc) return; - if (!state->paths.insert(path).second) return; - state->pending++; - } + if (includeDerivers && info->deriver && isValidPath(*info->deriver)) + res.insert(*info->deriver); + return res; + }; - queryPathInfo(path, {[&](std::future> fut) { - // FIXME: calls to isValidPath() should be async - - try { - auto info = fut.get(); - - if (flipDirection) { - - StorePathSet referrers; - queryReferrers(path, referrers); - for (auto & ref : referrers) - if (ref != path) - enqueue(ref); - - if (includeOutputs) - for (auto & i : queryValidDerivers(path)) - enqueue(i); - - if (includeDerivers && path.isDerivation()) - for (auto & i : queryDerivationOutputs(path)) - if (isValidPath(i) && queryPathInfo(i)->deriver == path) - enqueue(i); - - } else { - - for (auto & ref : info->references) - if (ref != path) - enqueue(ref); - - if (includeOutputs && path.isDerivation()) - for (auto & i : queryDerivationOutputs(path)) - if (isValidPath(i)) enqueue(i); - - if (includeDerivers && info->deriver && isValidPath(*info->deriver)) - enqueue(*info->deriver); - - } - - { - auto state(state_.lock()); - assert(state->pending); - if (!--state->pending) done.notify_one(); - } - - } catch (...) { - auto state(state_.lock()); - if (!state->exc) state->exc = std::current_exception(); - assert(state->pending); - if (!--state->pending) done.notify_one(); - }; - }}); - }; - - for (auto & startPath : startPaths) - enqueue(startPath); - - { - auto state(state_.lock()); - while (state->pending) state.wait(done); - if (state->exc) std::rethrow_exception(state->exc); - } + computeClosure( + startPaths, paths_, + [&](const StorePath& path, + std::function>&)> + processEdges) { + std::promise> promise; + std::function>)> + getDependencies = + [&](std::future> fut) { + try { + promise.set_value(queryDeps(path, fut)); + } catch (...) { + promise.set_exception(std::current_exception()); + } + }; + queryPathInfo(path, getDependencies); + processEdges(promise); + }); } - void Store::computeFSClosure(const StorePath & startPath, StorePathSet & paths_, bool flipDirection, bool includeOutputs, bool includeDerivers) { diff --git a/src/libutil/closure.hh b/src/libutil/closure.hh new file mode 100644 index 000000000..779b9b2d5 --- /dev/null +++ b/src/libutil/closure.hh @@ -0,0 +1,69 @@ +#include +#include +#include "sync.hh" + +using std::set; + +namespace nix { + +template +using GetEdgesAsync = std::function> &)>)>; + +template +void computeClosure( + const set startElts, + set & res, + GetEdgesAsync getEdgesAsync +) +{ + struct State + { + size_t pending; + set & res; + std::exception_ptr exc; + }; + + Sync state_(State{0, res, 0}); + + std::function enqueue; + + std::condition_variable done; + + enqueue = [&](const T & current) -> void { + { + auto state(state_.lock()); + if (state->exc) return; + if (!state->res.insert(current).second) return; + state->pending++; + } + + getEdgesAsync(current, [&](std::promise> & prom) { + try { + auto children = prom.get_future().get(); + for (auto & child : children) + enqueue(child); + { + auto state(state_.lock()); + assert(state->pending); + if (!--state->pending) done.notify_one(); + } + } catch (...) { + auto state(state_.lock()); + if (!state->exc) state->exc = std::current_exception(); + assert(state->pending); + if (!--state->pending) done.notify_one(); + }; + }); + }; + + for (auto & startElt : startElts) + enqueue(startElt); + + { + auto state(state_.lock()); + while (state->pending) state.wait(done); + if (state->exc) std::rethrow_exception(state->exc); + } +} + +} diff --git a/src/libutil/tests/closure.cc b/src/libutil/tests/closure.cc new file mode 100644 index 000000000..7597e7807 --- /dev/null +++ b/src/libutil/tests/closure.cc @@ -0,0 +1,70 @@ +#include "closure.hh" +#include + +namespace nix { + +using namespace std; + +map> testGraph = { + { "A", { "B", "C", "G" } }, + { "B", { "A" } }, // Loops back to A + { "C", { "F" } }, // Indirect reference + { "D", { "A" } }, // Not reachable, but has backreferences + { "E", {} }, // Just not reachable + { "F", {} }, + { "G", { "G" } }, // Self reference +}; + +TEST(closure, correctClosure) { + set aClosure; + set expectedClosure = {"A", "B", "C", "F", "G"}; + computeClosure( + {"A"}, + aClosure, + [&](const string currentNode, function> &)> processEdges) { + promise> promisedNodes; + promisedNodes.set_value(testGraph[currentNode]); + processEdges(promisedNodes); + } + ); + + ASSERT_EQ(aClosure, expectedClosure); +} + +TEST(closure, properlyHandlesDirectExceptions) { + struct TestExn {}; + set aClosure; + EXPECT_THROW( + computeClosure( + {"A"}, + aClosure, + [&](const string currentNode, function> &)> processEdges) { + throw TestExn(); + } + ), + TestExn + ); +} + +TEST(closure, properlyHandlesExceptionsInPromise) { + struct TestExn {}; + set aClosure; + EXPECT_THROW( + computeClosure( + {"A"}, + aClosure, + [&](const string currentNode, function> &)> processEdges) { + promise> promise; + try { + throw TestExn(); + } catch (...) { + promise.set_exception(std::current_exception()); + } + processEdges(promise); + } + ), + TestExn + ); +} + +}