From 9b62c52e5c61829f1b75d4f97c0b63bdf4dd64ea Mon Sep 17 00:00:00 2001 From: Maximilian Bosch Date: Thu, 14 Mar 2024 22:47:37 +0100 Subject: [PATCH] hydra-queue-runner: drop broken connections from pool Closes #1336 When restarting postgresql, the connections are still reused in `hydra-queue-runner` causing errors like this main thread: Lost connection to the database server. queue monitor: Lost connection to the database server. and no more builds being processed. `hydra-evaluator` doesn't have that issue since it crashes right away. We could let it retry indefinitely as well (see below), but I don't want to change too much. If the DB is still unreachable 10s later, the process will stop with a non-zero exit code because of a missing DB connection. This however isn't such a big deal because it will be immediately restarted afterwards. With the current configuration, Hydra will never give up, but restart (and retry) infinitely. To me that seems reasonable, i.e. to retry DB connections on a long-running process. If this doesn't work out, the monitoring should fire anyways because the queue fills up, but I'm open to discuss that. Please note that this isn't reproducible with the DB and the queue runner on the same machine when using `services.hydra-dev`, because of the `Requires=` dependency `hydra-queue-runner.service` -> `hydra-init.service` -> `postgresql.service` that causes the queue runner to be restarted on `systemctl restart postgresql`. Internally, Hydra uses Nix's pool data structure: it basically has N slots (here DB connections) and whenever a new one is requested, an idle slot is provided or a new one is created (when N slots are active, it'll be waited until one slot is free). The issue in the code here is however that whenever an error is encountered, the slot is released, however the same broken connection will be reused the next time. By using `Pool::Handle::markBad`, Nix will drop a broken slot. This is now being done when `pqxx::broken_connection` was caught. --- src/hydra-queue-runner/hydra-queue-runner.cc | 15 ++++++--- src/hydra-queue-runner/queue-monitor.cc | 34 +++++++++++--------- src/hydra-queue-runner/state.hh | 2 +- 3 files changed, 31 insertions(+), 20 deletions(-) diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index 0f4e759c..dffd8ad6 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -928,10 +928,17 @@ void State::run(BuildID buildOne) while (true) { try { auto conn(dbPool.get()); - receiver dumpStatus_(*conn, "dump_status"); - while (true) { - conn->await_notification(); - dumpStatus(*conn); + try { + receiver dumpStatus_(*conn, "dump_status"); + while (true) { + conn->await_notification(); + dumpStatus(*conn); + } + } catch (pqxx::broken_connection & connEx) { + printMsg(lvlError, "main thread: %s", connEx.what()); + printMsg(lvlError, "main thread: Reconnecting in 10s"); + conn.markBad(); + sleep(10); } } catch (std::exception & e) { printMsg(lvlError, "main thread: %s", e.what()); diff --git a/src/hydra-queue-runner/queue-monitor.cc b/src/hydra-queue-runner/queue-monitor.cc index d7214c43..ce5eef3f 100644 --- a/src/hydra-queue-runner/queue-monitor.cc +++ b/src/hydra-queue-runner/queue-monitor.cc @@ -10,8 +10,14 @@ using namespace nix; void State::queueMonitor() { while (true) { + auto conn(dbPool.get()); try { - queueMonitorLoop(); + queueMonitorLoop(*conn); + } catch (pqxx::broken_connection & e) { + printMsg(lvlError, "queue monitor: %s", e.what()); + printMsg(lvlError, "queue monitor: Reconnecting in 10s"); + conn.markBad(); + sleep(10); } catch (std::exception & e) { printError("queue monitor: %s", e.what()); sleep(10); // probably a DB problem, so don't retry right away @@ -20,16 +26,14 @@ void State::queueMonitor() } -void State::queueMonitorLoop() +void State::queueMonitorLoop(Connection & conn) { - auto conn(dbPool.get()); - - receiver buildsAdded(*conn, "builds_added"); - receiver buildsRestarted(*conn, "builds_restarted"); - receiver buildsCancelled(*conn, "builds_cancelled"); - receiver buildsDeleted(*conn, "builds_deleted"); - receiver buildsBumped(*conn, "builds_bumped"); - receiver jobsetSharesChanged(*conn, "jobset_shares_changed"); + receiver buildsAdded(conn, "builds_added"); + receiver buildsRestarted(conn, "builds_restarted"); + receiver buildsCancelled(conn, "builds_cancelled"); + receiver buildsDeleted(conn, "builds_deleted"); + receiver buildsBumped(conn, "builds_bumped"); + receiver jobsetSharesChanged(conn, "jobset_shares_changed"); auto destStore = getDestStore(); @@ -39,17 +43,17 @@ void State::queueMonitorLoop() while (!quit) { localStore->clearPathInfoCache(); - bool done = getQueuedBuilds(*conn, destStore, lastBuildId); + bool done = getQueuedBuilds(conn, destStore, lastBuildId); if (buildOne && buildOneDone) quit = true; /* Sleep until we get notification from the database about an event. */ if (done && !quit) { - conn->await_notification(); + conn.await_notification(); nrQueueWakeups++; } else - conn->get_notifs(); + conn.get_notifs(); if (auto lowestId = buildsAdded.get()) { lastBuildId = std::min(lastBuildId, static_cast(std::stoul(*lowestId) - 1)); @@ -61,11 +65,11 @@ void State::queueMonitorLoop() } if (buildsCancelled.get() || buildsDeleted.get() || buildsBumped.get()) { printMsg(lvlTalkative, "got notification: builds cancelled or bumped"); - processQueueChange(*conn); + processQueueChange(conn); } if (jobsetSharesChanged.get()) { printMsg(lvlTalkative, "got notification: jobset shares changed"); - processJobsetSharesChange(*conn); + processJobsetSharesChange(conn); } } diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index d1c78c68..d9e969e4 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -531,7 +531,7 @@ private: void queueMonitor(); - void queueMonitorLoop(); + void queueMonitorLoop(Connection & conn); /* Check the queue for new builds. */ bool getQueuedBuilds(Connection & conn,