Use our own Thread struct instead of std::thread #8

Merged
puck merged 2 commits from own-thread into main 2024-06-14 20:13:24 +00:00
3 changed files with 86 additions and 21 deletions

View file

@ -23,11 +23,11 @@
] ]
}, },
"locked": { "locked": {
"lastModified": 1701473968, "lastModified": 1717285511,
"narHash": "sha256-YcVE5emp1qQ8ieHUnxt1wCZCC3ZfAS+SRRWZ2TMda7E=", "narHash": "sha256-iKzJcpdXih14qYVcZ9QC9XuZYnPc6T8YImb6dX166kw=",
"owner": "hercules-ci", "owner": "hercules-ci",
"repo": "flake-parts", "repo": "flake-parts",
"rev": "34fed993f1674c8d06d58b37ce1e0fe5eebcb9f5", "rev": "2a55567fcf15b1b1c7ed712a2c6fadaec7412ea8",
"type": "github" "type": "github"
}, },
"original": { "original": {
@ -39,6 +39,7 @@
"lix": { "lix": {
"inputs": { "inputs": {
"flake-compat": "flake-compat", "flake-compat": "flake-compat",
"nix2container": "nix2container",
"nixpkgs": [ "nixpkgs": [
"nixpkgs" "nixpkgs"
], ],
@ -46,11 +47,11 @@
"pre-commit-hooks": "pre-commit-hooks" "pre-commit-hooks": "pre-commit-hooks"
}, },
"locked": { "locked": {
"lastModified": 1717081103, "lastModified": 1718228457,
"narHash": "sha256-4hrY8lIK6boX0xe6LN+OFpsmOAITl0Iam17FC8Kjslk=", "narHash": "sha256-vGumESUGu/jo2Lm5bha/xBsJKVlb1wuclXlL9xudRp4=",
"ref": "refs/heads/main", "ref": "refs/heads/main",
"rev": "c161687b5fa6e7604e99ee5df2e73388952baafb", "rev": "f46194faa2fc9c78250702c8eb7a4b756e0bd944",
"revCount": 15698, "revCount": 15757,
"type": "git", "type": "git",
"url": "https://git@git.lix.systems/lix-project/lix" "url": "https://git@git.lix.systems/lix-project/lix"
}, },
@ -66,11 +67,11 @@
] ]
}, },
"locked": { "locked": {
"lastModified": 1701208414, "lastModified": 1703863825,
"narHash": "sha256-xrQ0FyhwTZK6BwKhahIkUVZhMNk21IEI1nUcWSONtpo=", "narHash": "sha256-rXwqjtwiGKJheXB43ybM8NwWB8rO2dSRrEqes0S7F5Y=",
"owner": "nix-community", "owner": "nix-community",
"repo": "nix-github-actions", "repo": "nix-github-actions",
"rev": "93e39cc1a087d65bcf7a132e75a650c44dd2b734", "rev": "5163432afc817cf8bd1f031418d1869e4c9d5547",
"type": "github" "type": "github"
}, },
"original": { "original": {
@ -79,18 +80,34 @@
"type": "github" "type": "github"
} }
}, },
"nix2container": {
"flake": false,
"locked": {
"lastModified": 1712990762,
"narHash": "sha256-hO9W3w7NcnYeX8u8cleHiSpK2YJo7ecarFTUlbybl7k=",
"owner": "nlewo",
"repo": "nix2container",
"rev": "20aad300c925639d5d6cbe30013c8357ce9f2a2e",
"type": "github"
},
"original": {
"owner": "nlewo",
"repo": "nix2container",
"type": "github"
}
},
"nixpkgs": { "nixpkgs": {
"locked": { "locked": {
"lastModified": 1703134684, "lastModified": 1718132686,
"narHash": "sha256-SQmng1EnBFLzS7WSRyPM9HgmZP2kLJcPAz+Ug/nug6o=", "narHash": "sha256-JRinkq+FeAkYnrrK8+Bh+jtLHJBN5jDzSimk1ye00EE=",
"owner": "NixOS", "owner": "NixOS",
"repo": "nixpkgs", "repo": "nixpkgs",
"rev": "d6863cbcbbb80e71cecfc03356db1cda38919523", "rev": "96b3dae4f8753c1f5ce0d06b57fe250fb5d9b0e0",
"type": "github" "type": "github"
}, },
"original": { "original": {
"owner": "NixOS", "owner": "NixOS",
"ref": "nixpkgs-unstable", "ref": "nixos-23.11-small",
"repo": "nixpkgs", "repo": "nixpkgs",
"type": "github" "type": "github"
} }
@ -143,11 +160,11 @@
] ]
}, },
"locked": { "locked": {
"lastModified": 1702979157, "lastModified": 1718139168,
"narHash": "sha256-RnFBbLbpqtn4AoJGXKevQMCGhra4h6G2MPcuTSZZQ+g=", "narHash": "sha256-1TZQcdETNdJMcfwwoshVeCjwWfrPtkSQ8y8wFX3it7k=",
"owner": "numtide", "owner": "numtide",
"repo": "treefmt-nix", "repo": "treefmt-nix",
"rev": "2961375283668d867e64129c22af532de8e77734", "rev": "1cb529bffa880746a1d0ec4e0f5076876af931f1",
"type": "github" "type": "github"
}, },
"original": { "original": {

View file

@ -1,7 +1,7 @@
{ {
description = "Hydra's builtin hydra-eval-jobs as a standalone"; description = "Hydra's builtin hydra-eval-jobs as a standalone";
inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixpkgs-unstable"; inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixos-23.11-small";
inputs.flake-parts.url = "github:hercules-ci/flake-parts"; inputs.flake-parts.url = "github:hercules-ci/flake-parts";
inputs.flake-parts.inputs.nixpkgs-lib.follows = "nixpkgs"; inputs.flake-parts.inputs.nixpkgs-lib.follows = "nixpkgs";
inputs.treefmt-nix.url = "github:numtide/treefmt-nix"; inputs.treefmt-nix.url = "github:numtide/treefmt-nix";

View file

@ -8,6 +8,7 @@
#include <sys/wait.h> #include <sys/wait.h>
#include <nlohmann/json.hpp> #include <nlohmann/json.hpp>
#include <errno.h> #include <errno.h>
#include <pthread.h>
#include <signal.h> #include <signal.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
@ -24,7 +25,6 @@
#include <lix/libutil/ref.hh> #include <lix/libutil/ref.hh>
#include <lix/libstore/store-api.hh> #include <lix/libstore/store-api.hh>
#include <map> #include <map>
#include <thread>
#include <condition_variable> #include <condition_variable>
#include <filesystem> #include <filesystem>
#include <exception> #include <exception>
@ -99,6 +99,54 @@ struct Proc {
~Proc() {} ~Proc() {}
}; };
// We'd highly prefer using std::thread here; but this won't let us configure the stack
// size. macOS uses 512KiB size stacks for non-main threads, and musl defaults to 128k.
// While Nix configures a 64MiB size for the main thread, this doesn't propagate to the
// threads we launch here. It turns out, running the evaluator under an anemic stack of
// 0.5MiB has it overflow way too quickly. Hence, we have our own custom Thread struct.
struct Thread {
Outdated
Review

maybe our brain is constantly having too much of a crab rave, but this particular struct feels very risky lifetimes wise: if you ever destroy the backing memory without joining the thread first, you blow up the closure of the std::function, unless they miraculously made std::function a shared pointer or something.

thinking out loud: it seems potentially desirable to put the entire structure into a shared_ptr that itself internally retains a reference to as long as the thread execution inside lives. and even that feels potentially like it's not strictly correct at destruction time from an orphaned thread.

orrr on thread init you just move the std function into a stack variable, but hmmm, you actually would want to hand it something that it only has to destruct from the outer scope, to be sound against the thread taking a while to create and it getting destroyed in the mean time. yeah ok. so you would change the context type you give the inner thread to just a unique_ptr of std::function, which you .release() when you create the thread, then reconstitute into a unique_ptr inside init().

that should be trivially sound, yeah.

an alternative you could do to be suitably annoying about this is to just join the thread in the destructor, which probably would have to be able to throw. oh no how does this deal with exceptions :(

join needs to probably do stuff with exception_ptr. ok so then there does have to be shared state, fiddlesticks. probably then instead of unique_ptr of std::function you just want a little inner struct with exception pointer and the function as before, and then the context argument to thread creation is a released unique_ptr of that shared_ptr that's then reconstituted by the same manner as above. that then ensures that one ref count is leaked until the thread actually starts and reclaims it.

maybe our brain is constantly having too much of a crab rave, but this particular struct feels very risky lifetimes wise: if you ever destroy the backing memory without joining the thread first, you blow up the closure of the std::function, unless they miraculously made std::function a shared pointer or something. thinking out loud: it seems potentially desirable to put the entire structure into a shared_ptr that itself internally retains a reference to as long as the thread execution inside lives. and even that feels potentially like it's not strictly correct at destruction time from an orphaned thread. orrr on thread init you just move the std function into a stack variable, but hmmm, you actually would want to *hand* it something that *it only* has to destruct from the outer scope, to be sound against the thread taking a while to create and it getting destroyed in the mean time. yeah ok. so you would change the context type you give the inner thread to *just* a unique_ptr of std::function, which you .release() when you create the thread, then reconstitute into a unique_ptr inside init(). that should be trivially sound, yeah. an alternative you could do to be suitably annoying about this is to just join the thread in the destructor, which probably would have to be able to throw. oh no how does this deal with exceptions :( join needs to probably do stuff with exception_ptr. ok so then there does have to be shared state, fiddlesticks. probably then instead of unique_ptr of std::function you just want a little inner struct with exception pointer and the function as before, and then the context argument to thread creation is a released unique_ptr of that shared_ptr that's then reconstituted by the same manner as above. that then ensures that one ref count is leaked until the thread actually starts and reclaims it.
Outdated
Review

This should be mostly fixed now? Dropping the thread will just detach it; and joining it now behaves as intended.

This should be mostly fixed now? Dropping the thread will just detach it; and joining it now behaves as intended.
pthread_t thread;
Thread(const Thread &) = delete;
Thread(Thread &&) noexcept = default;
Thread(std::function<void(void)> f) {
int s;
pthread_attr_t attr;
auto func = std::make_unique<std::function<void(void)>>(std::move(f));
if ((s = pthread_attr_init(&attr)) != 0) {
throw SysError(s, "calling pthread_attr_init");
}
if ((s = pthread_attr_setstacksize(&attr, 64 * 1024 * 1024)) != 0) {
throw SysError(s, "calling pthread_attr_setstacksize");
}
if ((s = pthread_create(&thread, &attr, Thread::init, func.release())) != 0) {
throw SysError(s, "calling pthread_launch");
}
if ((s = pthread_attr_destroy(&attr)) != 0) {
throw SysError(s, "calling pthread_attr_destroy");
}
}
void join() {
int s;
s = pthread_join(thread, nullptr);
if (s != 0) {
throw SysError(s, "calling pthread_join");
}
}
private:
static void *init(void *ptr) {
std::unique_ptr<std::function<void(void)>> func;
func.reset(static_cast<std::function<void(void)> *>(ptr));
(*func)();
return 0;
}
};
struct State { struct State {
std::set<json> todo = json::array({json::array()}); std::set<json> todo = json::array({json::array()});
std::set<json> active; std::set<json> active;
@ -332,10 +380,10 @@ int main(int argc, char **argv) {
Sync<State> state_; Sync<State> state_;
/* Start a collector thread per worker process. */ /* Start a collector thread per worker process. */
std::vector<std::thread> threads; std::vector<Thread> threads;
std::condition_variable wakeup; std::condition_variable wakeup;
for (size_t i = 0; i < myArgs.nrWorkers; i++) { for (size_t i = 0; i < myArgs.nrWorkers; i++) {
threads.emplace_back(collector, std::ref(state_), std::ref(wakeup)); threads.emplace_back(std::bind(collector, std::ref(state_), std::ref(wakeup)));
} }
for (auto &thread : threads) for (auto &thread : threads)