Use our own Thread struct instead of std::thread

We'd highly prefer using std::thread here; but this won't let us configure the stack
size. macOS uses 512KiB size stacks for non-main threads, and musl defaults to 128k.
While Nix configures a 64MiB size for the main thread, this doesn't propagate to the
threads we launch here. It turns out, running the evaluator under an anemic stack of
0.5MiB has it overflow way too quickly. Hence, we have our own custom Thread struct.
This commit is contained in:
puck 2024-06-09 13:20:20 +00:00
parent 040db2fe26
commit 11d467fecd

View file

@ -8,6 +8,7 @@
#include <sys/wait.h> #include <sys/wait.h>
#include <nlohmann/json.hpp> #include <nlohmann/json.hpp>
#include <errno.h> #include <errno.h>
#include <pthread.h>
#include <signal.h> #include <signal.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
@ -24,7 +25,6 @@
#include <lix/libutil/ref.hh> #include <lix/libutil/ref.hh>
#include <lix/libstore/store-api.hh> #include <lix/libstore/store-api.hh>
#include <map> #include <map>
#include <thread>
#include <condition_variable> #include <condition_variable>
#include <filesystem> #include <filesystem>
#include <exception> #include <exception>
@ -99,6 +99,54 @@ struct Proc {
~Proc() {} ~Proc() {}
}; };
// We'd highly prefer using std::thread here; but this won't let us configure the stack
// size. macOS uses 512KiB size stacks for non-main threads, and musl defaults to 128k.
// While Nix configures a 64MiB size for the main thread, this doesn't propagate to the
// threads we launch here. It turns out, running the evaluator under an anemic stack of
// 0.5MiB has it overflow way too quickly. Hence, we have our own custom Thread struct.
struct Thread {
pthread_t thread;
Thread(const Thread &) = delete;
Thread(Thread &&) noexcept = default;
Thread(std::function<void(void)> f) {
int s;
pthread_attr_t attr;
auto func = std::make_unique<std::function<void(void)>>(std::move(f));
if ((s = pthread_attr_init(&attr)) != 0) {
throw SysError(s, "calling pthread_attr_init");
}
if ((s = pthread_attr_setstacksize(&attr, 64 * 1024 * 1024)) != 0) {
throw SysError(s, "calling pthread_attr_setstacksize");
}
if ((s = pthread_create(&thread, &attr, Thread::init, func.release())) != 0) {
throw SysError(s, "calling pthread_launch");
}
if ((s = pthread_attr_destroy(&attr)) != 0) {
throw SysError(s, "calling pthread_attr_destroy");
}
}
void join() {
int s;
s = pthread_join(thread, nullptr);
if (s != 0) {
throw SysError(s, "calling pthread_join");
}
}
private:
static void *init(void *ptr) {
std::unique_ptr<std::function<void(void)>> func;
func.reset(static_cast<std::function<void(void)> *>(ptr));
(*func)();
return 0;
}
};
struct State { struct State {
std::set<json> todo = json::array({json::array()}); std::set<json> todo = json::array({json::array()});
std::set<json> active; std::set<json> active;
@ -332,10 +380,10 @@ int main(int argc, char **argv) {
Sync<State> state_; Sync<State> state_;
/* Start a collector thread per worker process. */ /* Start a collector thread per worker process. */
std::vector<std::thread> threads; std::vector<Thread> threads;
std::condition_variable wakeup; std::condition_variable wakeup;
for (size_t i = 0; i < myArgs.nrWorkers; i++) { for (size_t i = 0; i < myArgs.nrWorkers; i++) {
threads.emplace_back(collector, std::ref(state_), std::ref(wakeup)); threads.emplace_back(std::bind(collector, std::ref(state_), std::ref(wakeup)));
} }
for (auto &thread : threads) for (auto &thread : threads)