diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index 0ee710cb..5ffa7fe6 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -926,10 +926,17 @@ void State::run(BuildID buildOne) while (true) { try { auto conn(dbPool.get()); - receiver dumpStatus_(*conn, "dump_status"); - while (true) { - conn->await_notification(); - dumpStatus(*conn); + try { + receiver dumpStatus_(*conn, "dump_status"); + while (true) { + conn->await_notification(); + dumpStatus(*conn); + } + } catch (pqxx::broken_connection & connEx) { + printMsg(lvlError, "main thread: %s", connEx.what()); + printMsg(lvlError, "main thread: Reconnecting in 10s"); + conn.markBad(); + sleep(10); } } catch (std::exception & e) { printMsg(lvlError, "main thread: %s", e.what()); diff --git a/src/hydra-queue-runner/queue-monitor.cc b/src/hydra-queue-runner/queue-monitor.cc index 203f9f1d..1bd51f5a 100644 --- a/src/hydra-queue-runner/queue-monitor.cc +++ b/src/hydra-queue-runner/queue-monitor.cc @@ -10,8 +10,14 @@ using namespace nix; void State::queueMonitor() { while (true) { + auto conn(dbPool.get()); try { - queueMonitorLoop(); + queueMonitorLoop(*conn); + } catch (pqxx::broken_connection & e) { + printMsg(lvlError, "queue monitor: %s", e.what()); + printMsg(lvlError, "queue monitor: Reconnecting in 10s"); + conn.markBad(); + sleep(10); } catch (std::exception & e) { printError("queue monitor: %s", e.what()); sleep(10); // probably a DB problem, so don't retry right away @@ -20,16 +26,14 @@ void State::queueMonitor() } -void State::queueMonitorLoop() +void State::queueMonitorLoop(Connection & conn) { - auto conn(dbPool.get()); - - receiver buildsAdded(*conn, "builds_added"); - receiver buildsRestarted(*conn, "builds_restarted"); - receiver buildsCancelled(*conn, "builds_cancelled"); - receiver buildsDeleted(*conn, "builds_deleted"); - receiver buildsBumped(*conn, "builds_bumped"); - receiver jobsetSharesChanged(*conn, "jobset_shares_changed"); + receiver buildsAdded(conn, "builds_added"); + receiver buildsRestarted(conn, "builds_restarted"); + receiver buildsCancelled(conn, "builds_cancelled"); + receiver buildsDeleted(conn, "builds_deleted"); + receiver buildsBumped(conn, "builds_bumped"); + receiver jobsetSharesChanged(conn, "jobset_shares_changed"); auto destStore = getDestStore(); @@ -39,17 +43,17 @@ void State::queueMonitorLoop() while (!quit) { localStore->clearPathInfoCache(); - bool done = getQueuedBuilds(*conn, destStore, lastBuildId); + bool done = getQueuedBuilds(conn, destStore, lastBuildId); if (buildOne && buildOneDone) quit = true; /* Sleep until we get notification from the database about an event. */ if (done && !quit) { - conn->await_notification(); + conn.await_notification(); nrQueueWakeups++; } else - conn->get_notifs(); + conn.get_notifs(); if (auto lowestId = buildsAdded.get()) { lastBuildId = std::min(lastBuildId, static_cast(std::stoul(*lowestId) - 1)); @@ -61,11 +65,11 @@ void State::queueMonitorLoop() } if (buildsCancelled.get() || buildsDeleted.get() || buildsBumped.get()) { printMsg(lvlTalkative, "got notification: builds cancelled or bumped"); - processQueueChange(*conn); + processQueueChange(conn); } if (jobsetSharesChanged.get()) { printMsg(lvlTalkative, "got notification: jobset shares changed"); - processJobsetSharesChange(*conn); + processJobsetSharesChange(conn); } } diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index cda238ae..5d242cdf 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -498,7 +498,7 @@ private: void queueMonitor(); - void queueMonitorLoop(); + void queueMonitorLoop(Connection & conn); /* Check the queue for new builds. */ bool getQueuedBuilds(Connection & conn,