Use a thread per connection

This commit is contained in:
Eelco Dolstra 2021-08-20 11:18:35 +02:00
parent ff453b06f9
commit 262520fcfe
2 changed files with 72 additions and 67 deletions

View file

@ -636,6 +636,7 @@ void LocalStore::removeUnusedLinks(const GCState & state)
} }
void LocalStore::collectGarbage(const GCOptions & options, GCResults & results) void LocalStore::collectGarbage(const GCOptions & options, GCResults & results)
{ {
GCState state(options, results); GCState state(options, results);
@ -674,49 +675,47 @@ void LocalStore::collectGarbage(const GCOptions & options, GCResults & results)
shutdownPipe.create(); shutdownPipe.create();
std::thread serverThread([&]() { std::thread serverThread([&]() {
std::map<int, std::pair<std::unique_ptr<AutoCloseFD>, std::string>> fdClients; Sync<std::map<int, std::thread>> connections;
bool quit = false; std::atomic_bool quit = false;
while (!quit) { Finally cleanup([&]() {
debug("GC roots server shutting down");
while (true) {
auto item = remove_begin(*connections.lock());
if (!item) break;
auto & [fd, thread] = *item;
shutdown(fd, SHUT_RDWR);
thread.join();
}
});
while (true) {
std::vector<struct pollfd> fds; std::vector<struct pollfd> fds;
fds.push_back({.fd = shutdownPipe.readSide.get(), .events = POLLIN}); fds.push_back({.fd = shutdownPipe.readSide.get(), .events = POLLIN});
fds.push_back({.fd = fdServer.get(), .events = POLLIN}); fds.push_back({.fd = fdServer.get(), .events = POLLIN});
for (auto & i : fdClients)
fds.push_back({.fd = i.first, .events = POLLIN});
auto count = poll(fds.data(), fds.size(), -1); auto count = poll(fds.data(), fds.size(), -1);
assert(count != -1); assert(count != -1);
for (auto & fd : fds) { if (fds[0].revents)
if (!fd.revents) continue;
if (fd.fd == shutdownPipe.readSide.get())
/* Parent is asking us to quit. */ /* Parent is asking us to quit. */
quit = true; break;
else if (fd.fd == fdServer.get()) {
if (fds[1].revents) {
/* Accept a new connection. */ /* Accept a new connection. */
assert(fd.revents & POLLIN); assert(fds[1].revents & POLLIN);
auto fdClient = std::make_unique<AutoCloseFD>(accept(fdServer.get(), nullptr, nullptr)); AutoCloseFD fdClient = accept(fdServer.get(), nullptr, nullptr);
if (*fdClient) { if (!fdClient) continue;
auto fd = fdClient->get();
fdClients.insert({fd, std::make_pair(std::move(fdClient), "")}); /* Process the connection in a separate thread. */
} auto fdClient_ = fdClient.get();
} std::thread clientThread([&, fdClient = std::move(fdClient)]() {
else { Finally cleanup([&]() {
/* Receive data from a client. */ connections.lock()->erase(fdClient.get());
auto fdClient = fdClients.find(fd.fd); });
assert(fdClient != fdClients.end());
if (fd.revents & POLLIN) {
char buf[16384];
auto n = read(fd.fd, buf, sizeof(buf));
if (n > 0) {
fdClient->second.second.append(buf, n);
/* Split the input into lines. */
while (true) { while (true) {
auto p = fdClient->second.second.find('\n'); try {
if (p == std::string::npos) break; auto path = readLine(fdClient.get());
/* We got a full line. Send ack back
to the client. */
auto path = fdClient->second.second.substr(0, p);
fdClient->second.second = fdClient->second.second.substr(p + 1);
auto storePath = maybeParseStorePath(path); auto storePath = maybeParseStorePath(path);
if (storePath) { if (storePath) {
debug("got new GC root '%s'", path); debug("got new GC root '%s'", path);
@ -724,33 +723,27 @@ void LocalStore::collectGarbage(const GCOptions & options, GCResults & results)
auto shared(state.shared.lock()); auto shared(state.shared.lock());
shared->tempRoots.insert(hashPart); shared->tempRoots.insert(hashPart);
/* If this path is currently being /* If this path is currently being
deleted, then we have to wait deleted, then we have to wait until
until deletion is finished to deletion is finished to ensure that
ensure that the client doesn't the client doesn't start
start re-creating it before re-creating it before we're
we're done. FIXME: ideally we done. FIXME: ideally we would use a
would use a FD for this so we FD for this so we don't block the
don't block the poll loop. */ poll loop. */
while (shared->pending == hashPart) { while (shared->pending == hashPart) {
debug("synchronising with deletion of path '%s'", path); debug("synchronising with deletion of path '%s'", path);
shared.wait(state.wakeup); shared.wait(state.wakeup);
} }
} else } else
printError("received garbage instead of a root from client"); printError("received garbage instead of a root from client");
// This could block, but meh. writeFull(fdClient.get(), "1", false);
try { } catch (Error &) { break; }
writeFull(fd.fd, "1", false);
} catch (SysError &) { }
}
} else if (n == 0)
fdClients.erase(fdClient);
} else
fdClients.erase(fdClient);
}
}
} }
});
debug("GC roots server shut down"); connections.lock()->insert({fdClient_, std::move(clientThread)});
}
}
}); });
Finally stopServer([&]() { Finally stopServer([&]() {

View file

@ -511,6 +511,18 @@ std::optional<typename T::mapped_type> get(const T & map, const typename T::key_
} }
/* Remove and return the first item from a container. */
template <class T>
std::optional<typename T::value_type> remove_begin(T & c)
{
auto i = c.begin();
if (i == c.end()) return {};
auto v = std::move(*i);
c.erase(i);
return v;
}
template<typename T> template<typename T>
class Callback; class Callback;