From 99afff03b06d6b9a3d464ede9724e29a03e91329 Mon Sep 17 00:00:00 2001 From: Maximilian Bosch Date: Thu, 14 Mar 2024 22:47:37 +0100 Subject: [PATCH] hydra-queue-runner: drop broken connections from pool Closes #1336 When restarting postgresql, the connections are still reused in `hydra-queue-runner` causing errors like this main thread: Lost connection to the database server. queue monitor: Lost connection to the database server. and no more builds being processed. `hydra-evaluator` doesn't have that issue since it crashes right away. We could let it retry indefinitely as well (see below), but I don't want to change too much. If the DB is still unreachable 10s later, the process will stop with a non-zero exit code because of a missing DB connection. This however isn't such a big deal because it will be immediately restarted afterwards. With the current configuration, Hydra will never give up, but restart (and retry) infinitely. To me that seems reasonable, i.e. to retry DB connections on a long-running process. If this doesn't work out, the monitoring should fire anyways because the queue fills up, but I'm open to discuss that. Please note that this isn't reproducible with the DB and the queue runner on the same machine when using `services.hydra-dev`, because of the `Requires=` dependency `hydra-queue-runner.service` -> `hydra-init.service` -> `postgresql.service` that causes the queue runner to be restarted on `systemctl restart postgresql`. Internally, Hydra uses Nix's pool data structure: it basically has N slots (here DB connections) and whenever a new one is requested, an idle slot is provided or a new one is created (when N slots are active, it'll be waited until one slot is free). The issue in the code here is however that whenever an error is encountered, the slot is released, however the same broken connection will be reused the next time. By using `Pool::Handle::markBad`, Nix will drop a broken slot. This is now being done when `pqxx::broken_connection` was caught. --- src/hydra-queue-runner/hydra-queue-runner.cc | 15 ++++++--- src/hydra-queue-runner/queue-monitor.cc | 34 +++++++++++--------- src/hydra-queue-runner/state.hh | 2 +- 3 files changed, 31 insertions(+), 20 deletions(-) diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index 0ee710cb..5ffa7fe6 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -926,10 +926,17 @@ void State::run(BuildID buildOne) while (true) { try { auto conn(dbPool.get()); - receiver dumpStatus_(*conn, "dump_status"); - while (true) { - conn->await_notification(); - dumpStatus(*conn); + try { + receiver dumpStatus_(*conn, "dump_status"); + while (true) { + conn->await_notification(); + dumpStatus(*conn); + } + } catch (pqxx::broken_connection & connEx) { + printMsg(lvlError, "main thread: %s", connEx.what()); + printMsg(lvlError, "main thread: Reconnecting in 10s"); + conn.markBad(); + sleep(10); } } catch (std::exception & e) { printMsg(lvlError, "main thread: %s", e.what()); diff --git a/src/hydra-queue-runner/queue-monitor.cc b/src/hydra-queue-runner/queue-monitor.cc index 203f9f1d..1bd51f5a 100644 --- a/src/hydra-queue-runner/queue-monitor.cc +++ b/src/hydra-queue-runner/queue-monitor.cc @@ -10,8 +10,14 @@ using namespace nix; void State::queueMonitor() { while (true) { + auto conn(dbPool.get()); try { - queueMonitorLoop(); + queueMonitorLoop(*conn); + } catch (pqxx::broken_connection & e) { + printMsg(lvlError, "queue monitor: %s", e.what()); + printMsg(lvlError, "queue monitor: Reconnecting in 10s"); + conn.markBad(); + sleep(10); } catch (std::exception & e) { printError("queue monitor: %s", e.what()); sleep(10); // probably a DB problem, so don't retry right away @@ -20,16 +26,14 @@ void State::queueMonitor() } -void State::queueMonitorLoop() +void State::queueMonitorLoop(Connection & conn) { - auto conn(dbPool.get()); - - receiver buildsAdded(*conn, "builds_added"); - receiver buildsRestarted(*conn, "builds_restarted"); - receiver buildsCancelled(*conn, "builds_cancelled"); - receiver buildsDeleted(*conn, "builds_deleted"); - receiver buildsBumped(*conn, "builds_bumped"); - receiver jobsetSharesChanged(*conn, "jobset_shares_changed"); + receiver buildsAdded(conn, "builds_added"); + receiver buildsRestarted(conn, "builds_restarted"); + receiver buildsCancelled(conn, "builds_cancelled"); + receiver buildsDeleted(conn, "builds_deleted"); + receiver buildsBumped(conn, "builds_bumped"); + receiver jobsetSharesChanged(conn, "jobset_shares_changed"); auto destStore = getDestStore(); @@ -39,17 +43,17 @@ void State::queueMonitorLoop() while (!quit) { localStore->clearPathInfoCache(); - bool done = getQueuedBuilds(*conn, destStore, lastBuildId); + bool done = getQueuedBuilds(conn, destStore, lastBuildId); if (buildOne && buildOneDone) quit = true; /* Sleep until we get notification from the database about an event. */ if (done && !quit) { - conn->await_notification(); + conn.await_notification(); nrQueueWakeups++; } else - conn->get_notifs(); + conn.get_notifs(); if (auto lowestId = buildsAdded.get()) { lastBuildId = std::min(lastBuildId, static_cast(std::stoul(*lowestId) - 1)); @@ -61,11 +65,11 @@ void State::queueMonitorLoop() } if (buildsCancelled.get() || buildsDeleted.get() || buildsBumped.get()) { printMsg(lvlTalkative, "got notification: builds cancelled or bumped"); - processQueueChange(*conn); + processQueueChange(conn); } if (jobsetSharesChanged.get()) { printMsg(lvlTalkative, "got notification: jobset shares changed"); - processJobsetSharesChange(*conn); + processJobsetSharesChange(conn); } } diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index cda238ae..5d242cdf 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -498,7 +498,7 @@ private: void queueMonitor(); - void queueMonitorLoop(); + void queueMonitorLoop(Connection & conn); /* Check the queue for new builds. */ bool getQueuedBuilds(Connection & conn,