Merge "gc: refactor the gc server thread out into a class without changing it" into main

This commit is contained in:
jade 2024-07-21 10:36:10 +00:00 committed by Gerrit Code Review
commit 4fa6961aa2
4 changed files with 197 additions and 132 deletions

View file

@ -22,8 +22,8 @@
namespace nix { namespace nix {
static std::string gcSocketPath = "/gc-socket/socket"; constexpr static const std::string_view gcSocketPath = "/gc-socket/socket";
static std::string gcRootsDir = "gcroots"; constexpr static const std::string_view gcRootsDir = "gcroots";
static void makeSymlink(const Path & link, const Path & target) static void makeSymlink(const Path & link, const Path & target)
@ -359,16 +359,34 @@ void LocalStore::findRuntimeRoots(Roots & roots, bool censor)
} }
struct GCLimitReached { }; struct GCLimitReached : std::exception { };
void LocalStore::collectGarbage(const GCOptions & options, GCResults & results) /**
{ * Delegate class to expose just the operations required to perform GC on a store.
bool shouldDelete = options.action == GCOptions::gcDeleteDead || options.action == GCOptions::gcDeleteSpecific; */
bool gcKeepOutputs = settings.gcKeepOutputs; class GCStoreDelegate {
bool gcKeepDerivations = settings.gcKeepDerivations; LocalStore const & store;
StorePathSet roots, dead, alive; public:
GCStoreDelegate(LocalStore const & store) : store(store) {}
std::optional<StorePath> maybeParseStorePath(std::string_view path) const {
return store.maybeParseStorePath(path);
}
};
/**
* Class holding a server to receive new GC roots.
*/
class GCOperation {
const GCStoreDelegate store;
std::thread serverThread;
Pipe shutdownPipe;
AutoCloseFD fdServer;
struct Shared struct Shared
{ {
@ -381,9 +399,165 @@ void LocalStore::collectGarbage(const GCOptions & options, GCResults & results)
std::optional<std::string> pending; std::optional<std::string> pending;
}; };
Sync<Shared> _shared; void runServerThread();
std::condition_variable wakeup; std::condition_variable wakeup;
Sync<Shared> _shared;
public:
GCOperation(LocalStore const & store, Path stateDir) : store(store)
{
/* Start the server for receiving new roots. */
shutdownPipe.create();
auto socketPath = stateDir + gcSocketPath;
createDirs(dirOf(socketPath));
fdServer = createUnixDomainSocket(socketPath, 0666);
if (fcntl(fdServer.get(), F_SETFL, fcntl(fdServer.get(), F_GETFL) | O_NONBLOCK) == -1) {
throw SysError("making socket '%1%' non-blocking", socketPath);
}
serverThread = std::thread([this]() { runServerThread(); });
}
void addTempRoot(std::string rootHashPart)
{
_shared.lock()->tempRoots.insert(rootHashPart);
}
void releasePending()
{
auto shared(_shared.lock());
shared->pending.reset();
wakeup.notify_all();
}
/**
* Marks a path as pending deletion if it is not in tempRoots.
*
* Returns whether it was marked for deletion.
*/
bool markPendingIfPresent(std::string const & hashPart)
{
auto shared(_shared.lock());
if (shared->tempRoots.count(hashPart)) {
return false;
}
shared->pending = hashPart;
return true;
}
~GCOperation();
};
void GCOperation::runServerThread()
{
Sync<std::map<int, std::thread>> connections;
Finally cleanup([&]() {
debug("GC roots server shutting down");
fdServer.close();
while (true) {
auto item = remove_begin(*connections.lock());
if (!item) break;
auto & [fd, thread] = *item;
shutdown(fd, SHUT_RDWR);
thread.join();
}
});
while (true) {
std::vector<struct pollfd> fds;
fds.push_back({.fd = shutdownPipe.readSide.get(), .events = POLLIN});
fds.push_back({.fd = fdServer.get(), .events = POLLIN});
auto count = poll(fds.data(), fds.size(), -1);
assert(count != -1);
if (fds[0].revents)
/* Parent is asking us to quit. */
break;
if (fds[1].revents) {
/* Accept a new connection. */
assert(fds[1].revents & POLLIN);
AutoCloseFD fdClient{accept(fdServer.get(), nullptr, nullptr)};
if (!fdClient) continue;
debug("GC roots server accepted new client");
/* Process the connection in a separate thread. */
auto fdClient_ = fdClient.get();
std::thread clientThread([&, fdClient = std::move(fdClient)]() {
Finally cleanup([&]() {
auto conn(connections.lock());
auto i = conn->find(fdClient.get());
if (i != conn->end()) {
i->second.detach();
conn->erase(i);
}
});
/* On macOS, accepted sockets inherit the
non-blocking flag from the server socket, so
explicitly make it blocking. */
if (fcntl(fdClient.get(), F_SETFL, fcntl(fdClient.get(), F_GETFL) & ~O_NONBLOCK) == -1)
abort();
while (true) {
try {
auto path = readLine(fdClient.get());
auto storePath = store.maybeParseStorePath(path);
if (storePath) {
debug("got new GC root '%s'", path);
auto hashPart = std::string(storePath->hashPart());
auto shared(_shared.lock());
shared->tempRoots.insert(hashPart);
/* If this path is currently being
deleted, then we have to wait until
deletion is finished to ensure that
the client doesn't start
re-creating it before we're
done. FIXME: ideally we would use a
FD for this so we don't block the
poll loop. */
while (shared->pending == hashPart) {
debug("synchronising with deletion of path '%s'", path);
shared.wait(wakeup);
}
} else
printError("received garbage instead of a root from client");
writeFull(fdClient.get(), "1", false);
} catch (Error & e) {
debug("reading GC root from client: %s", e.msg());
break;
}
}
});
connections.lock()->insert({fdClient_, std::move(clientThread)});
}
}
}
GCOperation::~GCOperation()
{
writeFull(shutdownPipe.writeSide.get(), "x", false);
{
auto shared(_shared.lock());
wakeup.notify_all();
}
if (serverThread.joinable()) serverThread.join();
}
void LocalStore::collectGarbage(const GCOptions & options, GCResults & results)
{
bool shouldDelete = options.action == GCOptions::gcDeleteDead || options.action == GCOptions::gcDeleteSpecific;
bool gcKeepOutputs = settings.gcKeepOutputs;
bool gcKeepDerivations = settings.gcKeepDerivations;
StorePathSet roots, dead, alive;
/* Using `--ignore-liveness' with `--delete' can have unintended /* Using `--ignore-liveness' with `--delete' can have unintended
consequences if `keep-outputs' or `keep-derivations' are true consequences if `keep-outputs' or `keep-derivations' are true
@ -395,7 +569,7 @@ void LocalStore::collectGarbage(const GCOptions & options, GCResults & results)
} }
if (shouldDelete) if (shouldDelete)
deletePath(reservedPath); deletePath(reservedSpacePath);
/* Acquire the global GC root. Note: we don't use fdGCLock /* Acquire the global GC root. Note: we don't use fdGCLock
here because then in auto-gc mode, another thread could here because then in auto-gc mode, another thread could
@ -408,110 +582,7 @@ void LocalStore::collectGarbage(const GCOptions & options, GCResults & results)
if (auto p = getEnv("_NIX_TEST_GC_SYNC_1")) if (auto p = getEnv("_NIX_TEST_GC_SYNC_1"))
readFile(*p); readFile(*p);
/* Start the server for receiving new roots. */ GCOperation gcServer {*this, stateDir.get()};
auto socketPath = stateDir.get() + gcSocketPath;
createDirs(dirOf(socketPath));
auto fdServer = createUnixDomainSocket(socketPath, 0666);
if (fcntl(fdServer.get(), F_SETFL, fcntl(fdServer.get(), F_GETFL) | O_NONBLOCK) == -1)
throw SysError("making socket '%1%' non-blocking", socketPath);
Pipe shutdownPipe;
shutdownPipe.create();
std::thread serverThread([&]() {
Sync<std::map<int, std::thread>> connections;
Finally cleanup([&]() {
debug("GC roots server shutting down");
fdServer.close();
while (true) {
auto item = remove_begin(*connections.lock());
if (!item) break;
auto & [fd, thread] = *item;
shutdown(fd, SHUT_RDWR);
thread.join();
}
});
while (true) {
std::vector<struct pollfd> fds;
fds.push_back({.fd = shutdownPipe.readSide.get(), .events = POLLIN});
fds.push_back({.fd = fdServer.get(), .events = POLLIN});
auto count = poll(fds.data(), fds.size(), -1);
assert(count != -1);
if (fds[0].revents)
/* Parent is asking us to quit. */
break;
if (fds[1].revents) {
/* Accept a new connection. */
assert(fds[1].revents & POLLIN);
AutoCloseFD fdClient{accept(fdServer.get(), nullptr, nullptr)};
if (!fdClient) continue;
debug("GC roots server accepted new client");
/* Process the connection in a separate thread. */
auto fdClient_ = fdClient.get();
std::thread clientThread([&, fdClient = std::move(fdClient)]() {
Finally cleanup([&]() {
auto conn(connections.lock());
auto i = conn->find(fdClient.get());
if (i != conn->end()) {
i->second.detach();
conn->erase(i);
}
});
/* On macOS, accepted sockets inherit the
non-blocking flag from the server socket, so
explicitly make it blocking. */
if (fcntl(fdClient.get(), F_SETFL, fcntl(fdClient.get(), F_GETFL) & ~O_NONBLOCK) == -1)
abort();
while (true) {
try {
auto path = readLine(fdClient.get());
auto storePath = maybeParseStorePath(path);
if (storePath) {
debug("got new GC root '%s'", path);
auto hashPart = std::string(storePath->hashPart());
auto shared(_shared.lock());
shared->tempRoots.insert(hashPart);
/* If this path is currently being
deleted, then we have to wait until
deletion is finished to ensure that
the client doesn't start
re-creating it before we're
done. FIXME: ideally we would use a
FD for this so we don't block the
poll loop. */
while (shared->pending == hashPart) {
debug("synchronising with deletion of path '%s'", path);
shared.wait(wakeup);
}
} else
printError("received garbage instead of a root from client");
writeFull(fdClient.get(), "1", false);
} catch (Error & e) {
debug("reading GC root from client: %s", e.msg());
break;
}
}
});
connections.lock()->insert({fdClient_, std::move(clientThread)});
}
}
});
Finally stopServer([&]() {
writeFull(shutdownPipe.writeSide.get(), "x", false);
wakeup.notify_all();
if (serverThread.joinable()) serverThread.join();
});
/* Find the roots. Since we've grabbed the GC lock, the set of /* Find the roots. Since we've grabbed the GC lock, the set of
permanent roots cannot increase now. */ permanent roots cannot increase now. */
@ -527,7 +598,7 @@ void LocalStore::collectGarbage(const GCOptions & options, GCResults & results)
Roots tempRoots; Roots tempRoots;
findTempRoots(tempRoots, true); findTempRoots(tempRoots, true);
for (auto & root : tempRoots) { for (auto & root : tempRoots) {
_shared.lock()->tempRoots.insert(std::string(root.first.hashPart())); gcServer.addTempRoot(std::string(root.first.hashPart()));
roots.insert(root.first); roots.insert(root.first);
} }
@ -580,9 +651,7 @@ void LocalStore::collectGarbage(const GCOptions & options, GCResults & results)
/* Wake up any GC client waiting for deletion of the paths in /* Wake up any GC client waiting for deletion of the paths in
'visited' to finish. */ 'visited' to finish. */
Finally releasePending([&]() { Finally releasePending([&]() {
auto shared(_shared.lock()); gcServer.releasePending();
shared->pending.reset();
wakeup.notify_all();
}); });
auto enqueue = [&](const StorePath & path) { auto enqueue = [&](const StorePath & path) {
@ -629,14 +698,9 @@ void LocalStore::collectGarbage(const GCOptions & options, GCResults & results)
&& !options.pathsToDelete.count(*path)) && !options.pathsToDelete.count(*path))
return; return;
{ if (!gcServer.markPendingIfPresent(std::string(path->hashPart()))) {
auto hashPart = std::string(path->hashPart()); debug("cannot delete '%s' because it's a temporary root", printStorePath(*path));
auto shared(_shared.lock()); return markAlive();
if (shared->tempRoots.count(hashPart)) {
debug("cannot delete '%s' because it's a temporary root", printStorePath(*path));
return markAlive();
}
shared->pending = hashPart;
} }
if (isValidPath(*path)) { if (isValidPath(*path)) {

View file

@ -181,7 +181,7 @@ LocalStore::LocalStore(const Params & params)
, LocalFSStore(params) , LocalFSStore(params)
, dbDir(stateDir + "/db") , dbDir(stateDir + "/db")
, linksDir(realStoreDir + "/.links") , linksDir(realStoreDir + "/.links")
, reservedPath(dbDir + "/reserved") , reservedSpacePath(dbDir + "/reserved")
, schemaPath(dbDir + "/schema") , schemaPath(dbDir + "/schema")
, tempRootsDir(stateDir + "/temproots") , tempRootsDir(stateDir + "/temproots")
, fnTempRoots(fmt("%s/%d", tempRootsDir, getpid())) , fnTempRoots(fmt("%s/%d", tempRootsDir, getpid()))
@ -259,10 +259,10 @@ LocalStore::LocalStore(const Params & params)
before doing a garbage collection. */ before doing a garbage collection. */
try { try {
struct stat st; struct stat st;
if (stat(reservedPath.c_str(), &st) == -1 || if (stat(reservedSpacePath.c_str(), &st) == -1 ||
st.st_size != settings.reservedSize) st.st_size != settings.reservedSize)
{ {
AutoCloseFD fd{open(reservedPath.c_str(), O_WRONLY | O_CREAT | O_CLOEXEC, 0600)}; AutoCloseFD fd{open(reservedSpacePath.c_str(), O_WRONLY | O_CREAT | O_CLOEXEC, 0600)};
int res = -1; int res = -1;
#if HAVE_POSIX_FALLOCATE #if HAVE_POSIX_FALLOCATE
res = posix_fallocate(fd.get(), 0, settings.reservedSize); res = posix_fallocate(fd.get(), 0, settings.reservedSize);

View file

@ -119,7 +119,8 @@ public:
const Path dbDir; const Path dbDir;
const Path linksDir; const Path linksDir;
const Path reservedPath; /** Path kept around to reserve some filesystem space to be able to begin a garbage collection */
const Path reservedSpacePath;
const Path schemaPath; const Path schemaPath;
const Path tempRootsDir; const Path tempRootsDir;
const Path fnTempRoots; const Path fnTempRoots;

View file

@ -33,7 +33,7 @@ sleep 2
pid2=$! pid2=$!
# Start a build. This should not be blocked by the GC in progress. # Start a build. This should not be blocked by the GC in progress.
outPath=$(nix-build --max-silent-time 60 -o "$TEST_ROOT/result" -E " outPath=$(nix-build --max-silent-time 60 --debug -o "$TEST_ROOT/result" -E "
with import ./config.nix; with import ./config.nix;
mkDerivation { mkDerivation {
name = \"non-blocking\"; name = \"non-blocking\";