diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc index 66d63f12..6127a793 100644 --- a/src/hydra-queue-runner/build-remote.cc +++ b/src/hydra-queue-runner/build-remote.cc @@ -117,7 +117,7 @@ static void copyClosureTo(ref destStore, void State::buildRemote(ref destStore, Machine::ptr machine, Step::ptr step, unsigned int maxSilentTime, unsigned int buildTimeout, - RemoteResult & result) + RemoteResult & result, std::shared_ptr activeStep) { assert(BuildResult::TimedOut == 8); @@ -138,6 +138,24 @@ void State::buildRemote(ref destStore, Child child; openConnection(machine, tmpDir, logFD.get(), child); + { + auto activeStepState(activeStep->state_.lock()); + if (activeStepState->cancelled) throw Error("step cancelled"); + activeStepState->pid = child.pid; + } + + Finally clearPid([&]() { + auto activeStepState(activeStep->state_.lock()); + activeStepState->pid = -1; + + /* FIXME: there is a slight race here with step + cancellation in State::processQueueChange(), which + could call kill() on this pid after we've done waitpid() + on it. With pid wrap-around, there is a tiny + possibility that we end up killing another + process. Meh. */ + }); + FdSource from(child.from.get()); FdSink to(child.to.get()); diff --git a/src/hydra-queue-runner/builder.cc b/src/hydra-queue-runner/builder.cc index c37332d5..ee3ac779 100644 --- a/src/hydra-queue-runner/builder.cc +++ b/src/hydra-queue-runner/builder.cc @@ -15,11 +15,9 @@ void State::builder(MachineReservation::ptr reservation) auto activeStep = std::make_shared(); activeStep->step = reservation->step; - activeStep->threadId = pthread_self(); activeSteps_.lock()->insert(activeStep); Finally removeActiveStep([&]() { - activeStep->threadId = -1; activeSteps_.lock()->erase(activeStep); }); @@ -27,7 +25,7 @@ void State::builder(MachineReservation::ptr reservation) try { auto destStore = getDestStore(); - res = doBuildStep(destStore, step, reservation->machine); + res = doBuildStep(destStore, reservation, activeStep); } catch (std::exception & e) { printMsg(lvlError, format("uncaught exception building ‘%1%’ on ‘%2%’: %3%") % step->drvPath % reservation->machine->sshName % e.what()); @@ -56,9 +54,13 @@ void State::builder(MachineReservation::ptr reservation) } -State::StepResult State::doBuildStep(nix::ref destStore, Step::ptr step, - Machine::ptr machine) +State::StepResult State::doBuildStep(nix::ref destStore, + MachineReservation::ptr reservation, + std::shared_ptr activeStep) { + auto & step(reservation->step); + auto & machine(reservation->machine); + { auto step_(step->state.lock()); assert(step_->created); @@ -156,26 +158,19 @@ State::StepResult State::doBuildStep(nix::ref destStore, Step::ptr step, /* Do the build. */ try { /* FIXME: referring builds may have conflicting timeouts. */ - buildRemote(destStore, machine, step, maxSilentTime, buildTimeout, result); + buildRemote(destStore, machine, step, maxSilentTime, buildTimeout, result, activeStep); } catch (NoTokens & e) { result.stepStatus = bsNarSizeLimitExceeded; } catch (Error & e) { - result.stepStatus = bsAborted; - result.errorMsg = e.msg(); - result.canRetry = true; - } catch (__cxxabiv1::__forced_unwind & e) { - /* The queue monitor thread cancelled this step. */ - try { + if (activeStep->state_.lock()->cancelled) { printInfo("marking step %d of build %d as cancelled", stepNr, buildId); - pqxx::work txn(*conn); - finishBuildStep(txn, result.startTime, time(0), result.overhead, buildId, - stepNr, machine->sshName, bsCancelled, ""); - txn.commit(); - stepFinished = true; - } catch (...) { - ignoreException(); + result.stepStatus = bsCancelled; + result.canRetry = false; + } else { + result.stepStatus = bsAborted; + result.errorMsg = e.msg(); + result.canRetry = true; } - throw; } if (result.stepStatus == bsSuccess) diff --git a/src/hydra-queue-runner/queue-monitor.cc b/src/hydra-queue-runner/queue-monitor.cc index d72d1989..644919d7 100644 --- a/src/hydra-queue-runner/queue-monitor.cc +++ b/src/hydra-queue-runner/queue-monitor.cc @@ -337,20 +337,23 @@ void State::processQueueChange(Connection & conn) { auto activeSteps(activeSteps_.lock()); for (auto & activeStep : *activeSteps) { - auto threadId = activeStep->threadId; // FIXME: use Sync or atomic? - if (threadId == 0) continue; - std::set dependents; std::set steps; getDependents(activeStep->step, dependents, steps); if (!dependents.empty()) continue; - printInfo("cancelling thread for build step ‘%s’", activeStep->step->drvPath); - - int err = pthread_cancel(threadId); - if (err) - printError("error cancelling thread for build step ‘%s’: %s", - activeStep->step->drvPath, strerror(err)); + { + auto activeStepState(activeStep->state_.lock()); + if (activeStepState->cancelled) continue; + activeStepState->cancelled = true; + if (activeStepState->pid != -1) { + printInfo("killing builder process %d of build step ‘%s’", + activeStepState->pid, activeStep->step->drvPath); + if (kill(activeStepState->pid, SIGINT) == -1) + printError("error killing build step ‘%s’: %s", + activeStep->step->drvPath, strerror(errno)); + } + } } } } diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index 7de2af86..b02491f2 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -377,7 +377,14 @@ private: struct ActiveStep { Step::ptr step; - pthread_t threadId; + + struct State + { + pid_t pid = -1; + bool cancelled = false; + }; + + nix::Sync state_; }; nix::Sync>> activeSteps_; @@ -476,12 +483,13 @@ private: retried. */ enum StepResult { sDone, sRetry, sMaybeCancelled }; StepResult doBuildStep(nix::ref destStore, - Step::ptr step, Machine::ptr machine); + MachineReservation::ptr reservation, + std::shared_ptr activeStep); void buildRemote(nix::ref destStore, Machine::ptr machine, Step::ptr step, unsigned int maxSilentTime, unsigned int buildTimeout, - RemoteResult & result); + RemoteResult & result, std::shared_ptr activeStep); void markSucceededBuild(pqxx::work & txn, Build::ptr build, const BuildOutput & res, bool isCachedBuild, time_t startTime, time_t stopTime);