hydra-queue-runner: Handle status queries on the main thread
Doing it on the queue monitor thread was problematic because processing the queue can take a while.
This commit is contained in:
parent
a40ca6b76e
commit
69be3cfe93
1 changed files with 33 additions and 19 deletions
|
@ -84,6 +84,23 @@ struct Connection : pqxx::connection
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
struct receiver : public pqxx::notification_receiver
|
||||||
|
{
|
||||||
|
bool status = false;
|
||||||
|
receiver(pqxx::connection_base & c, const std::string & channel)
|
||||||
|
: pqxx::notification_receiver(c, channel) { }
|
||||||
|
void operator() (const string & payload, int pid) override
|
||||||
|
{
|
||||||
|
status = true;
|
||||||
|
};
|
||||||
|
bool get() {
|
||||||
|
bool b = status;
|
||||||
|
status = false;
|
||||||
|
return b;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
typedef unsigned int BuildID;
|
typedef unsigned int BuildID;
|
||||||
|
|
||||||
|
|
||||||
|
@ -435,26 +452,9 @@ void State::queueMonitorLoop()
|
||||||
{
|
{
|
||||||
auto conn(dbPool.get());
|
auto conn(dbPool.get());
|
||||||
|
|
||||||
struct receiver : public pqxx::notification_receiver
|
|
||||||
{
|
|
||||||
bool status = false;
|
|
||||||
receiver(pqxx::connection_base & c, const std::string & channel)
|
|
||||||
: pqxx::notification_receiver(c, channel) { }
|
|
||||||
void operator() (const string & payload, int pid) override
|
|
||||||
{
|
|
||||||
status = true;
|
|
||||||
};
|
|
||||||
bool get() {
|
|
||||||
bool b = status;
|
|
||||||
status = false;
|
|
||||||
return b;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
receiver buildsAdded(*conn, "builds_added");
|
receiver buildsAdded(*conn, "builds_added");
|
||||||
receiver buildsRestarted(*conn, "builds_restarted");
|
receiver buildsRestarted(*conn, "builds_restarted");
|
||||||
receiver buildsCancelled(*conn, "builds_cancelled");
|
receiver buildsCancelled(*conn, "builds_cancelled");
|
||||||
receiver dumpStatus(*conn, "dump_status");
|
|
||||||
|
|
||||||
auto store = openStore(); // FIXME: pool
|
auto store = openStore(); // FIXME: pool
|
||||||
|
|
||||||
|
@ -479,8 +479,6 @@ void State::queueMonitorLoop()
|
||||||
removeCancelledBuilds(*conn);
|
removeCancelledBuilds(*conn);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dumpStatus.get())
|
|
||||||
State::dumpStatus();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1314,6 +1312,22 @@ void State::run()
|
||||||
|
|
||||||
std::thread(&State::dispatcher, this).detach();
|
std::thread(&State::dispatcher, this).detach();
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
auto conn(dbPool.get());
|
||||||
|
receiver dumpStatus(*conn, "dump_status");
|
||||||
|
while (true) {
|
||||||
|
conn->await_notification();
|
||||||
|
if (dumpStatus.get())
|
||||||
|
State::dumpStatus();
|
||||||
|
}
|
||||||
|
} catch (std::exception & e) {
|
||||||
|
printMsg(lvlError, format("main thread: %1%") % e.what());
|
||||||
|
sleep(10); // probably a DB problem, so don't retry right away
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Never reached.
|
||||||
queueMonitorThread.join();
|
queueMonitorThread.join();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue