forked from lix-project/hydra
hydra-queue-runner: drop broken connections from pool
Closes #1336 When restarting postgresql, the connections are still reused in `hydra-queue-runner` causing errors like this main thread: Lost connection to the database server. queue monitor: Lost connection to the database server. and no more builds being processed. `hydra-evaluator` doesn't have that issue since it crashes right away. We could let it retry indefinitely as well (see below), but I don't want to change too much. If the DB is still unreachable 10s later, the process will stop with a non-zero exit code because of a missing DB connection. This however isn't such a big deal because it will be immediately restarted afterwards. With the current configuration, Hydra will never give up, but restart (and retry) infinitely. To me that seems reasonable, i.e. to retry DB connections on a long-running process. If this doesn't work out, the monitoring should fire anyways because the queue fills up, but I'm open to discuss that. Please note that this isn't reproducible with the DB and the queue runner on the same machine when using `services.hydra-dev`, because of the `Requires=` dependency `hydra-queue-runner.service` -> `hydra-init.service` -> `postgresql.service` that causes the queue runner to be restarted on `systemctl restart postgresql`. Internally, Hydra uses Nix's pool data structure: it basically has N slots (here DB connections) and whenever a new one is requested, an idle slot is provided or a new one is created (when N slots are active, it'll be waited until one slot is free). The issue in the code here is however that whenever an error is encountered, the slot is released, however the same broken connection will be reused the next time. By using `Pool::Handle::markBad`, Nix will drop a broken slot. This is now being done when `pqxx::broken_connection` was caught.
This commit is contained in:
parent
ef6be80f54
commit
9b62c52e5c
|
@ -928,10 +928,17 @@ void State::run(BuildID buildOne)
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
auto conn(dbPool.get());
|
auto conn(dbPool.get());
|
||||||
receiver dumpStatus_(*conn, "dump_status");
|
try {
|
||||||
while (true) {
|
receiver dumpStatus_(*conn, "dump_status");
|
||||||
conn->await_notification();
|
while (true) {
|
||||||
dumpStatus(*conn);
|
conn->await_notification();
|
||||||
|
dumpStatus(*conn);
|
||||||
|
}
|
||||||
|
} catch (pqxx::broken_connection & connEx) {
|
||||||
|
printMsg(lvlError, "main thread: %s", connEx.what());
|
||||||
|
printMsg(lvlError, "main thread: Reconnecting in 10s");
|
||||||
|
conn.markBad();
|
||||||
|
sleep(10);
|
||||||
}
|
}
|
||||||
} catch (std::exception & e) {
|
} catch (std::exception & e) {
|
||||||
printMsg(lvlError, "main thread: %s", e.what());
|
printMsg(lvlError, "main thread: %s", e.what());
|
||||||
|
|
|
@ -10,8 +10,14 @@ using namespace nix;
|
||||||
void State::queueMonitor()
|
void State::queueMonitor()
|
||||||
{
|
{
|
||||||
while (true) {
|
while (true) {
|
||||||
|
auto conn(dbPool.get());
|
||||||
try {
|
try {
|
||||||
queueMonitorLoop();
|
queueMonitorLoop(*conn);
|
||||||
|
} catch (pqxx::broken_connection & e) {
|
||||||
|
printMsg(lvlError, "queue monitor: %s", e.what());
|
||||||
|
printMsg(lvlError, "queue monitor: Reconnecting in 10s");
|
||||||
|
conn.markBad();
|
||||||
|
sleep(10);
|
||||||
} catch (std::exception & e) {
|
} catch (std::exception & e) {
|
||||||
printError("queue monitor: %s", e.what());
|
printError("queue monitor: %s", e.what());
|
||||||
sleep(10); // probably a DB problem, so don't retry right away
|
sleep(10); // probably a DB problem, so don't retry right away
|
||||||
|
@ -20,16 +26,14 @@ void State::queueMonitor()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void State::queueMonitorLoop()
|
void State::queueMonitorLoop(Connection & conn)
|
||||||
{
|
{
|
||||||
auto conn(dbPool.get());
|
receiver buildsAdded(conn, "builds_added");
|
||||||
|
receiver buildsRestarted(conn, "builds_restarted");
|
||||||
receiver buildsAdded(*conn, "builds_added");
|
receiver buildsCancelled(conn, "builds_cancelled");
|
||||||
receiver buildsRestarted(*conn, "builds_restarted");
|
receiver buildsDeleted(conn, "builds_deleted");
|
||||||
receiver buildsCancelled(*conn, "builds_cancelled");
|
receiver buildsBumped(conn, "builds_bumped");
|
||||||
receiver buildsDeleted(*conn, "builds_deleted");
|
receiver jobsetSharesChanged(conn, "jobset_shares_changed");
|
||||||
receiver buildsBumped(*conn, "builds_bumped");
|
|
||||||
receiver jobsetSharesChanged(*conn, "jobset_shares_changed");
|
|
||||||
|
|
||||||
auto destStore = getDestStore();
|
auto destStore = getDestStore();
|
||||||
|
|
||||||
|
@ -39,17 +43,17 @@ void State::queueMonitorLoop()
|
||||||
while (!quit) {
|
while (!quit) {
|
||||||
localStore->clearPathInfoCache();
|
localStore->clearPathInfoCache();
|
||||||
|
|
||||||
bool done = getQueuedBuilds(*conn, destStore, lastBuildId);
|
bool done = getQueuedBuilds(conn, destStore, lastBuildId);
|
||||||
|
|
||||||
if (buildOne && buildOneDone) quit = true;
|
if (buildOne && buildOneDone) quit = true;
|
||||||
|
|
||||||
/* Sleep until we get notification from the database about an
|
/* Sleep until we get notification from the database about an
|
||||||
event. */
|
event. */
|
||||||
if (done && !quit) {
|
if (done && !quit) {
|
||||||
conn->await_notification();
|
conn.await_notification();
|
||||||
nrQueueWakeups++;
|
nrQueueWakeups++;
|
||||||
} else
|
} else
|
||||||
conn->get_notifs();
|
conn.get_notifs();
|
||||||
|
|
||||||
if (auto lowestId = buildsAdded.get()) {
|
if (auto lowestId = buildsAdded.get()) {
|
||||||
lastBuildId = std::min(lastBuildId, static_cast<unsigned>(std::stoul(*lowestId) - 1));
|
lastBuildId = std::min(lastBuildId, static_cast<unsigned>(std::stoul(*lowestId) - 1));
|
||||||
|
@ -61,11 +65,11 @@ void State::queueMonitorLoop()
|
||||||
}
|
}
|
||||||
if (buildsCancelled.get() || buildsDeleted.get() || buildsBumped.get()) {
|
if (buildsCancelled.get() || buildsDeleted.get() || buildsBumped.get()) {
|
||||||
printMsg(lvlTalkative, "got notification: builds cancelled or bumped");
|
printMsg(lvlTalkative, "got notification: builds cancelled or bumped");
|
||||||
processQueueChange(*conn);
|
processQueueChange(conn);
|
||||||
}
|
}
|
||||||
if (jobsetSharesChanged.get()) {
|
if (jobsetSharesChanged.get()) {
|
||||||
printMsg(lvlTalkative, "got notification: jobset shares changed");
|
printMsg(lvlTalkative, "got notification: jobset shares changed");
|
||||||
processJobsetSharesChange(*conn);
|
processJobsetSharesChange(conn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -531,7 +531,7 @@ private:
|
||||||
|
|
||||||
void queueMonitor();
|
void queueMonitor();
|
||||||
|
|
||||||
void queueMonitorLoop();
|
void queueMonitorLoop(Connection & conn);
|
||||||
|
|
||||||
/* Check the queue for new builds. */
|
/* Check the queue for new builds. */
|
||||||
bool getQueuedBuilds(Connection & conn,
|
bool getQueuedBuilds(Connection & conn,
|
||||||
|
|
Loading…
Reference in a new issue