forked from lix-project/hydra
Step cancellation: Don't use pthread_cancel()
This was a bad idea because pthread_cancel() is unsalvageable broken in C++. Destructors are not allowed to throw exceptions (especially in C++11), but pthread_cancel() can cause a __cxxabiv1::__forced_unwind exception inside any destructor that invokes a cancellation point. (This exception can be caught but *must* be rethrown.) So let's just kill the builder process instead.
This commit is contained in:
parent
95aa1f0590
commit
7863d2e1da
4 changed files with 57 additions and 33 deletions
|
@ -117,7 +117,7 @@ static void copyClosureTo(ref<Store> destStore,
|
||||||
void State::buildRemote(ref<Store> destStore,
|
void State::buildRemote(ref<Store> destStore,
|
||||||
Machine::ptr machine, Step::ptr step,
|
Machine::ptr machine, Step::ptr step,
|
||||||
unsigned int maxSilentTime, unsigned int buildTimeout,
|
unsigned int maxSilentTime, unsigned int buildTimeout,
|
||||||
RemoteResult & result)
|
RemoteResult & result, std::shared_ptr<ActiveStep> activeStep)
|
||||||
{
|
{
|
||||||
assert(BuildResult::TimedOut == 8);
|
assert(BuildResult::TimedOut == 8);
|
||||||
|
|
||||||
|
@ -138,6 +138,24 @@ void State::buildRemote(ref<Store> destStore,
|
||||||
Child child;
|
Child child;
|
||||||
openConnection(machine, tmpDir, logFD.get(), child);
|
openConnection(machine, tmpDir, logFD.get(), child);
|
||||||
|
|
||||||
|
{
|
||||||
|
auto activeStepState(activeStep->state_.lock());
|
||||||
|
if (activeStepState->cancelled) throw Error("step cancelled");
|
||||||
|
activeStepState->pid = child.pid;
|
||||||
|
}
|
||||||
|
|
||||||
|
Finally clearPid([&]() {
|
||||||
|
auto activeStepState(activeStep->state_.lock());
|
||||||
|
activeStepState->pid = -1;
|
||||||
|
|
||||||
|
/* FIXME: there is a slight race here with step
|
||||||
|
cancellation in State::processQueueChange(), which
|
||||||
|
could call kill() on this pid after we've done waitpid()
|
||||||
|
on it. With pid wrap-around, there is a tiny
|
||||||
|
possibility that we end up killing another
|
||||||
|
process. Meh. */
|
||||||
|
});
|
||||||
|
|
||||||
FdSource from(child.from.get());
|
FdSource from(child.from.get());
|
||||||
FdSink to(child.to.get());
|
FdSink to(child.to.get());
|
||||||
|
|
||||||
|
|
|
@ -15,11 +15,9 @@ void State::builder(MachineReservation::ptr reservation)
|
||||||
|
|
||||||
auto activeStep = std::make_shared<ActiveStep>();
|
auto activeStep = std::make_shared<ActiveStep>();
|
||||||
activeStep->step = reservation->step;
|
activeStep->step = reservation->step;
|
||||||
activeStep->threadId = pthread_self();
|
|
||||||
activeSteps_.lock()->insert(activeStep);
|
activeSteps_.lock()->insert(activeStep);
|
||||||
|
|
||||||
Finally removeActiveStep([&]() {
|
Finally removeActiveStep([&]() {
|
||||||
activeStep->threadId = -1;
|
|
||||||
activeSteps_.lock()->erase(activeStep);
|
activeSteps_.lock()->erase(activeStep);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -27,7 +25,7 @@ void State::builder(MachineReservation::ptr reservation)
|
||||||
|
|
||||||
try {
|
try {
|
||||||
auto destStore = getDestStore();
|
auto destStore = getDestStore();
|
||||||
res = doBuildStep(destStore, step, reservation->machine);
|
res = doBuildStep(destStore, reservation, activeStep);
|
||||||
} catch (std::exception & e) {
|
} catch (std::exception & e) {
|
||||||
printMsg(lvlError, format("uncaught exception building ‘%1%’ on ‘%2%’: %3%")
|
printMsg(lvlError, format("uncaught exception building ‘%1%’ on ‘%2%’: %3%")
|
||||||
% step->drvPath % reservation->machine->sshName % e.what());
|
% step->drvPath % reservation->machine->sshName % e.what());
|
||||||
|
@ -56,9 +54,13 @@ void State::builder(MachineReservation::ptr reservation)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
State::StepResult State::doBuildStep(nix::ref<Store> destStore, Step::ptr step,
|
State::StepResult State::doBuildStep(nix::ref<Store> destStore,
|
||||||
Machine::ptr machine)
|
MachineReservation::ptr reservation,
|
||||||
|
std::shared_ptr<ActiveStep> activeStep)
|
||||||
{
|
{
|
||||||
|
auto & step(reservation->step);
|
||||||
|
auto & machine(reservation->machine);
|
||||||
|
|
||||||
{
|
{
|
||||||
auto step_(step->state.lock());
|
auto step_(step->state.lock());
|
||||||
assert(step_->created);
|
assert(step_->created);
|
||||||
|
@ -156,26 +158,19 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore, Step::ptr step,
|
||||||
/* Do the build. */
|
/* Do the build. */
|
||||||
try {
|
try {
|
||||||
/* FIXME: referring builds may have conflicting timeouts. */
|
/* FIXME: referring builds may have conflicting timeouts. */
|
||||||
buildRemote(destStore, machine, step, maxSilentTime, buildTimeout, result);
|
buildRemote(destStore, machine, step, maxSilentTime, buildTimeout, result, activeStep);
|
||||||
} catch (NoTokens & e) {
|
} catch (NoTokens & e) {
|
||||||
result.stepStatus = bsNarSizeLimitExceeded;
|
result.stepStatus = bsNarSizeLimitExceeded;
|
||||||
} catch (Error & e) {
|
} catch (Error & e) {
|
||||||
result.stepStatus = bsAborted;
|
if (activeStep->state_.lock()->cancelled) {
|
||||||
result.errorMsg = e.msg();
|
|
||||||
result.canRetry = true;
|
|
||||||
} catch (__cxxabiv1::__forced_unwind & e) {
|
|
||||||
/* The queue monitor thread cancelled this step. */
|
|
||||||
try {
|
|
||||||
printInfo("marking step %d of build %d as cancelled", stepNr, buildId);
|
printInfo("marking step %d of build %d as cancelled", stepNr, buildId);
|
||||||
pqxx::work txn(*conn);
|
result.stepStatus = bsCancelled;
|
||||||
finishBuildStep(txn, result.startTime, time(0), result.overhead, buildId,
|
result.canRetry = false;
|
||||||
stepNr, machine->sshName, bsCancelled, "");
|
} else {
|
||||||
txn.commit();
|
result.stepStatus = bsAborted;
|
||||||
stepFinished = true;
|
result.errorMsg = e.msg();
|
||||||
} catch (...) {
|
result.canRetry = true;
|
||||||
ignoreException();
|
|
||||||
}
|
}
|
||||||
throw;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (result.stepStatus == bsSuccess)
|
if (result.stepStatus == bsSuccess)
|
||||||
|
|
|
@ -337,20 +337,23 @@ void State::processQueueChange(Connection & conn)
|
||||||
{
|
{
|
||||||
auto activeSteps(activeSteps_.lock());
|
auto activeSteps(activeSteps_.lock());
|
||||||
for (auto & activeStep : *activeSteps) {
|
for (auto & activeStep : *activeSteps) {
|
||||||
auto threadId = activeStep->threadId; // FIXME: use Sync or atomic?
|
|
||||||
if (threadId == 0) continue;
|
|
||||||
|
|
||||||
std::set<Build::ptr> dependents;
|
std::set<Build::ptr> dependents;
|
||||||
std::set<Step::ptr> steps;
|
std::set<Step::ptr> steps;
|
||||||
getDependents(activeStep->step, dependents, steps);
|
getDependents(activeStep->step, dependents, steps);
|
||||||
if (!dependents.empty()) continue;
|
if (!dependents.empty()) continue;
|
||||||
|
|
||||||
printInfo("cancelling thread for build step ‘%s’", activeStep->step->drvPath);
|
{
|
||||||
|
auto activeStepState(activeStep->state_.lock());
|
||||||
int err = pthread_cancel(threadId);
|
if (activeStepState->cancelled) continue;
|
||||||
if (err)
|
activeStepState->cancelled = true;
|
||||||
printError("error cancelling thread for build step ‘%s’: %s",
|
if (activeStepState->pid != -1) {
|
||||||
activeStep->step->drvPath, strerror(err));
|
printInfo("killing builder process %d of build step ‘%s’",
|
||||||
|
activeStepState->pid, activeStep->step->drvPath);
|
||||||
|
if (kill(activeStepState->pid, SIGINT) == -1)
|
||||||
|
printError("error killing build step ‘%s’: %s",
|
||||||
|
activeStep->step->drvPath, strerror(errno));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -377,7 +377,14 @@ private:
|
||||||
struct ActiveStep
|
struct ActiveStep
|
||||||
{
|
{
|
||||||
Step::ptr step;
|
Step::ptr step;
|
||||||
pthread_t threadId;
|
|
||||||
|
struct State
|
||||||
|
{
|
||||||
|
pid_t pid = -1;
|
||||||
|
bool cancelled = false;
|
||||||
|
};
|
||||||
|
|
||||||
|
nix::Sync<State> state_;
|
||||||
};
|
};
|
||||||
|
|
||||||
nix::Sync<std::set<std::shared_ptr<ActiveStep>>> activeSteps_;
|
nix::Sync<std::set<std::shared_ptr<ActiveStep>>> activeSteps_;
|
||||||
|
@ -476,12 +483,13 @@ private:
|
||||||
retried. */
|
retried. */
|
||||||
enum StepResult { sDone, sRetry, sMaybeCancelled };
|
enum StepResult { sDone, sRetry, sMaybeCancelled };
|
||||||
StepResult doBuildStep(nix::ref<nix::Store> destStore,
|
StepResult doBuildStep(nix::ref<nix::Store> destStore,
|
||||||
Step::ptr step, Machine::ptr machine);
|
MachineReservation::ptr reservation,
|
||||||
|
std::shared_ptr<ActiveStep> activeStep);
|
||||||
|
|
||||||
void buildRemote(nix::ref<nix::Store> destStore,
|
void buildRemote(nix::ref<nix::Store> destStore,
|
||||||
Machine::ptr machine, Step::ptr step,
|
Machine::ptr machine, Step::ptr step,
|
||||||
unsigned int maxSilentTime, unsigned int buildTimeout,
|
unsigned int maxSilentTime, unsigned int buildTimeout,
|
||||||
RemoteResult & result);
|
RemoteResult & result, std::shared_ptr<ActiveStep> activeStep);
|
||||||
|
|
||||||
void markSucceededBuild(pqxx::work & txn, Build::ptr build,
|
void markSucceededBuild(pqxx::work & txn, Build::ptr build,
|
||||||
const BuildOutput & res, bool isCachedBuild, time_t startTime, time_t stopTime);
|
const BuildOutput & res, bool isCachedBuild, time_t startTime, time_t stopTime);
|
||||||
|
|
Loading…
Reference in a new issue