forked from lix-project/lix
libutil: de-callback-ify computeClosure
only two users of this function exist. only one used it in a way that
even bears resemblance to asynchronicity, and even that one didn't do
it right. fully async and parallel computation would have only worked
if any getEdgesAsync never calls the continuation it receives itself,
only from more derived callbacks running on other threads. calling it
directly would cause the decoupling promise to be awaited immediately
*on the original thread*, completely negating all nice async effects.
Change-Id: I0aa640950cf327533a32dee410105efdabb448df
This commit is contained in:
parent
230860dbb8
commit
964ac8b0e8
|
@ -14,10 +14,9 @@ namespace nix {
|
||||||
void Store::computeFSClosure(const StorePathSet & startPaths,
|
void Store::computeFSClosure(const StorePathSet & startPaths,
|
||||||
StorePathSet & paths_, bool flipDirection, bool includeOutputs, bool includeDerivers)
|
StorePathSet & paths_, bool flipDirection, bool includeOutputs, bool includeDerivers)
|
||||||
{
|
{
|
||||||
std::function<std::set<StorePath>(const StorePath & path, std::future<ref<const ValidPathInfo>> &)> queryDeps;
|
std::function<std::set<StorePath>(const StorePath & path, ref<const ValidPathInfo>)> queryDeps;
|
||||||
if (flipDirection)
|
if (flipDirection)
|
||||||
queryDeps = [&](const StorePath& path,
|
queryDeps = [&](const StorePath& path, ref<const ValidPathInfo>) {
|
||||||
std::future<ref<const ValidPathInfo>> & fut) {
|
|
||||||
StorePathSet res;
|
StorePathSet res;
|
||||||
StorePathSet referrers;
|
StorePathSet referrers;
|
||||||
queryReferrers(path, referrers);
|
queryReferrers(path, referrers);
|
||||||
|
@ -36,10 +35,8 @@ void Store::computeFSClosure(const StorePathSet & startPaths,
|
||||||
return res;
|
return res;
|
||||||
};
|
};
|
||||||
else
|
else
|
||||||
queryDeps = [&](const StorePath& path,
|
queryDeps = [&](const StorePath& path, ref<const ValidPathInfo> info) {
|
||||||
std::future<ref<const ValidPathInfo>> & fut) {
|
|
||||||
StorePathSet res;
|
StorePathSet res;
|
||||||
auto info = fut.get();
|
|
||||||
for (auto& ref : info->references)
|
for (auto& ref : info->references)
|
||||||
if (ref != path)
|
if (ref != path)
|
||||||
res.insert(ref);
|
res.insert(ref);
|
||||||
|
@ -54,24 +51,11 @@ void Store::computeFSClosure(const StorePathSet & startPaths,
|
||||||
return res;
|
return res;
|
||||||
};
|
};
|
||||||
|
|
||||||
computeClosure<StorePath>(
|
paths_.merge(computeClosure<StorePath>(
|
||||||
startPaths, paths_,
|
startPaths,
|
||||||
[&](const StorePath& path,
|
[&](const StorePath& path) -> std::set<StorePath> {
|
||||||
std::function<void(std::promise<std::set<StorePath>>&)>
|
return queryDeps(path, queryPathInfo(path));
|
||||||
processEdges) {
|
}));
|
||||||
std::promise<std::set<StorePath>> promise;
|
|
||||||
std::function<void(std::future<ref<const ValidPathInfo>>)>
|
|
||||||
getDependencies =
|
|
||||||
[&](std::future<ref<const ValidPathInfo>> fut) {
|
|
||||||
try {
|
|
||||||
promise.set_value(queryDeps(path, fut));
|
|
||||||
} catch (...) {
|
|
||||||
promise.set_exception(std::current_exception());
|
|
||||||
}
|
|
||||||
};
|
|
||||||
queryPathInfo(path, getDependencies);
|
|
||||||
processEdges(promise);
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void Store::computeFSClosure(const StorePath & startPath,
|
void Store::computeFSClosure(const StorePath & startPath,
|
||||||
|
|
|
@ -43,20 +43,7 @@ void Realisation::closure(Store & store, const std::set<Realisation> & startOutp
|
||||||
return res;
|
return res;
|
||||||
};
|
};
|
||||||
|
|
||||||
computeClosure<Realisation>(
|
res.merge(computeClosure<Realisation>(startOutputs, getDeps));
|
||||||
startOutputs, res,
|
|
||||||
[&](const Realisation& current,
|
|
||||||
std::function<void(std::promise<std::set<Realisation>>&)>
|
|
||||||
processEdges) {
|
|
||||||
std::promise<std::set<Realisation>> promise;
|
|
||||||
try {
|
|
||||||
auto res = getDeps(current);
|
|
||||||
promise.set_value(res);
|
|
||||||
} catch (...) {
|
|
||||||
promise.set_exception(std::current_exception());
|
|
||||||
}
|
|
||||||
return processEdges(promise);
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
nlohmann::json Realisation::toJSON() const {
|
nlohmann::json Realisation::toJSON() const {
|
||||||
|
|
|
@ -1,72 +1,32 @@
|
||||||
#pragma once
|
#pragma once
|
||||||
///@file
|
///@file
|
||||||
|
|
||||||
|
#include <functional>
|
||||||
#include <set>
|
#include <set>
|
||||||
#include <future>
|
|
||||||
#include "sync.hh"
|
|
||||||
|
|
||||||
using std::set;
|
|
||||||
|
|
||||||
namespace nix {
|
namespace nix {
|
||||||
|
|
||||||
template<typename T>
|
template<typename T>
|
||||||
using GetEdgesAsync = std::function<void(const T &, std::function<void(std::promise<set<T>> &)>)>;
|
std::set<T> computeClosure(
|
||||||
|
std::set<T> startElts,
|
||||||
template<typename T>
|
std::function<std::set<T>(const T &)> getEdges
|
||||||
void computeClosure(
|
|
||||||
const set<T> startElts,
|
|
||||||
set<T> & res,
|
|
||||||
GetEdgesAsync<T> getEdgesAsync
|
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
struct State
|
std::set<T> res, queue = std::move(startElts);
|
||||||
{
|
|
||||||
size_t pending;
|
|
||||||
set<T> & res;
|
|
||||||
std::exception_ptr exc;
|
|
||||||
};
|
|
||||||
|
|
||||||
Sync<State> state_(State{0, res, 0});
|
while (!queue.empty()) {
|
||||||
|
std::set<T> next;
|
||||||
|
|
||||||
std::function<void(const T &)> enqueue;
|
for (auto & e : queue) {
|
||||||
|
if (res.insert(e).second) {
|
||||||
std::condition_variable done;
|
next.merge(getEdges(e));
|
||||||
|
|
||||||
enqueue = [&](const T & current) -> void {
|
|
||||||
{
|
|
||||||
auto state(state_.lock());
|
|
||||||
if (state->exc) return;
|
|
||||||
if (!state->res.insert(current).second) return;
|
|
||||||
state->pending++;
|
|
||||||
}
|
|
||||||
|
|
||||||
getEdgesAsync(current, [&](std::promise<set<T>> & prom) {
|
|
||||||
try {
|
|
||||||
auto children = prom.get_future().get();
|
|
||||||
for (auto & child : children)
|
|
||||||
enqueue(child);
|
|
||||||
{
|
|
||||||
auto state(state_.lock());
|
|
||||||
assert(state->pending);
|
|
||||||
if (!--state->pending) done.notify_one();
|
|
||||||
}
|
|
||||||
} catch (...) {
|
|
||||||
auto state(state_.lock());
|
|
||||||
if (!state->exc) state->exc = std::current_exception();
|
|
||||||
assert(state->pending);
|
|
||||||
if (!--state->pending) done.notify_one();
|
|
||||||
};
|
|
||||||
});
|
|
||||||
};
|
|
||||||
|
|
||||||
for (auto & startElt : startElts)
|
|
||||||
enqueue(startElt);
|
|
||||||
|
|
||||||
{
|
|
||||||
auto state(state_.lock());
|
|
||||||
while (state->pending) state.wait(done);
|
|
||||||
if (state->exc) std::rethrow_exception(state->exc);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
queue = std::move(next);
|
||||||
|
}
|
||||||
|
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,15 +16,11 @@ map<string, set<string>> testGraph = {
|
||||||
};
|
};
|
||||||
|
|
||||||
TEST(closure, correctClosure) {
|
TEST(closure, correctClosure) {
|
||||||
set<string> aClosure;
|
|
||||||
set<string> expectedClosure = {"A", "B", "C", "F", "G"};
|
set<string> expectedClosure = {"A", "B", "C", "F", "G"};
|
||||||
computeClosure<string>(
|
set<string> aClosure = computeClosure<string>(
|
||||||
{"A"},
|
{"A"},
|
||||||
aClosure,
|
[&](const string currentNode) {
|
||||||
[&](const string currentNode, function<void(promise<set<string>> &)> processEdges) {
|
return testGraph[currentNode];
|
||||||
promise<set<string>> promisedNodes;
|
|
||||||
promisedNodes.set_value(testGraph[currentNode]);
|
|
||||||
processEdges(promisedNodes);
|
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -33,12 +29,10 @@ TEST(closure, correctClosure) {
|
||||||
|
|
||||||
TEST(closure, properlyHandlesDirectExceptions) {
|
TEST(closure, properlyHandlesDirectExceptions) {
|
||||||
struct TestExn {};
|
struct TestExn {};
|
||||||
set<string> aClosure;
|
|
||||||
EXPECT_THROW(
|
EXPECT_THROW(
|
||||||
computeClosure<string>(
|
computeClosure<string>(
|
||||||
{"A"},
|
{"A"},
|
||||||
aClosure,
|
[&](const string currentNode) -> set<string> {
|
||||||
[&](const string currentNode, function<void(promise<set<string>> &)> processEdges) {
|
|
||||||
throw TestExn();
|
throw TestExn();
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
|
@ -46,25 +40,4 @@ TEST(closure, properlyHandlesDirectExceptions) {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(closure, properlyHandlesExceptionsInPromise) {
|
|
||||||
struct TestExn {};
|
|
||||||
set<string> aClosure;
|
|
||||||
EXPECT_THROW(
|
|
||||||
computeClosure<string>(
|
|
||||||
{"A"},
|
|
||||||
aClosure,
|
|
||||||
[&](const string currentNode, function<void(promise<set<string>> &)> processEdges) {
|
|
||||||
promise<set<string>> promise;
|
|
||||||
try {
|
|
||||||
throw TestExn();
|
|
||||||
} catch (...) {
|
|
||||||
promise.set_exception(std::current_exception());
|
|
||||||
}
|
|
||||||
processEdges(promise);
|
|
||||||
}
|
|
||||||
),
|
|
||||||
TestExn
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue