Merge pull request #1370 from Ma27/reconnect-db

hydra-queue-runner: drop broken connections from pool
This commit is contained in:
John Ericson 2024-03-26 11:21:35 -04:00 committed by GitHub
commit d614163e9c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 31 additions and 20 deletions

View file

@ -926,11 +926,18 @@ void State::run(BuildID buildOne)
while (true) { while (true) {
try { try {
auto conn(dbPool.get()); auto conn(dbPool.get());
try {
receiver dumpStatus_(*conn, "dump_status"); receiver dumpStatus_(*conn, "dump_status");
while (true) { while (true) {
conn->await_notification(); conn->await_notification();
dumpStatus(*conn); dumpStatus(*conn);
} }
} catch (pqxx::broken_connection & connEx) {
printMsg(lvlError, "main thread: %s", connEx.what());
printMsg(lvlError, "main thread: Reconnecting in 10s");
conn.markBad();
sleep(10);
}
} catch (std::exception & e) { } catch (std::exception & e) {
printMsg(lvlError, "main thread: %s", e.what()); printMsg(lvlError, "main thread: %s", e.what());
sleep(10); // probably a DB problem, so don't retry right away sleep(10); // probably a DB problem, so don't retry right away

View file

@ -10,8 +10,14 @@ using namespace nix;
void State::queueMonitor() void State::queueMonitor()
{ {
while (true) { while (true) {
auto conn(dbPool.get());
try { try {
queueMonitorLoop(); queueMonitorLoop(*conn);
} catch (pqxx::broken_connection & e) {
printMsg(lvlError, "queue monitor: %s", e.what());
printMsg(lvlError, "queue monitor: Reconnecting in 10s");
conn.markBad();
sleep(10);
} catch (std::exception & e) { } catch (std::exception & e) {
printError("queue monitor: %s", e.what()); printError("queue monitor: %s", e.what());
sleep(10); // probably a DB problem, so don't retry right away sleep(10); // probably a DB problem, so don't retry right away
@ -20,16 +26,14 @@ void State::queueMonitor()
} }
void State::queueMonitorLoop() void State::queueMonitorLoop(Connection & conn)
{ {
auto conn(dbPool.get()); receiver buildsAdded(conn, "builds_added");
receiver buildsRestarted(conn, "builds_restarted");
receiver buildsAdded(*conn, "builds_added"); receiver buildsCancelled(conn, "builds_cancelled");
receiver buildsRestarted(*conn, "builds_restarted"); receiver buildsDeleted(conn, "builds_deleted");
receiver buildsCancelled(*conn, "builds_cancelled"); receiver buildsBumped(conn, "builds_bumped");
receiver buildsDeleted(*conn, "builds_deleted"); receiver jobsetSharesChanged(conn, "jobset_shares_changed");
receiver buildsBumped(*conn, "builds_bumped");
receiver jobsetSharesChanged(*conn, "jobset_shares_changed");
auto destStore = getDestStore(); auto destStore = getDestStore();
@ -39,17 +43,17 @@ void State::queueMonitorLoop()
while (!quit) { while (!quit) {
localStore->clearPathInfoCache(); localStore->clearPathInfoCache();
bool done = getQueuedBuilds(*conn, destStore, lastBuildId); bool done = getQueuedBuilds(conn, destStore, lastBuildId);
if (buildOne && buildOneDone) quit = true; if (buildOne && buildOneDone) quit = true;
/* Sleep until we get notification from the database about an /* Sleep until we get notification from the database about an
event. */ event. */
if (done && !quit) { if (done && !quit) {
conn->await_notification(); conn.await_notification();
nrQueueWakeups++; nrQueueWakeups++;
} else } else
conn->get_notifs(); conn.get_notifs();
if (auto lowestId = buildsAdded.get()) { if (auto lowestId = buildsAdded.get()) {
lastBuildId = std::min(lastBuildId, static_cast<unsigned>(std::stoul(*lowestId) - 1)); lastBuildId = std::min(lastBuildId, static_cast<unsigned>(std::stoul(*lowestId) - 1));
@ -61,11 +65,11 @@ void State::queueMonitorLoop()
} }
if (buildsCancelled.get() || buildsDeleted.get() || buildsBumped.get()) { if (buildsCancelled.get() || buildsDeleted.get() || buildsBumped.get()) {
printMsg(lvlTalkative, "got notification: builds cancelled or bumped"); printMsg(lvlTalkative, "got notification: builds cancelled or bumped");
processQueueChange(*conn); processQueueChange(conn);
} }
if (jobsetSharesChanged.get()) { if (jobsetSharesChanged.get()) {
printMsg(lvlTalkative, "got notification: jobset shares changed"); printMsg(lvlTalkative, "got notification: jobset shares changed");
processJobsetSharesChange(*conn); processJobsetSharesChange(conn);
} }
} }

View file

@ -498,7 +498,7 @@ private:
void queueMonitor(); void queueMonitor();
void queueMonitorLoop(); void queueMonitorLoop(Connection & conn);
/* Check the queue for new builds. */ /* Check the queue for new builds. */
bool getQueuedBuilds(Connection & conn, bool getQueuedBuilds(Connection & conn,