hydra-queue-runner: adapt to nlohmann::json

This commit is contained in:
Linus Heckemann 2022-11-29 18:13:15 +01:00 committed by Maximilian Bosch
parent f48f00ee6d
commit 96e36201eb
No known key found for this signature in database
GPG key ID: 9A6EEA275CA5BE0A

View file

@ -8,6 +8,8 @@
#include <prometheus/exposer.h> #include <prometheus/exposer.h>
#include <nlohmann/json.hpp>
#include "state.hh" #include "state.hh"
#include "hydra-build-result.hh" #include "hydra-build-result.hh"
#include "store-api.hh" #include "store-api.hh"
@ -15,20 +17,11 @@
#include "globals.hh" #include "globals.hh"
#include "hydra-config.hh" #include "hydra-config.hh"
#include "json.hh"
#include "s3-binary-cache-store.hh" #include "s3-binary-cache-store.hh"
#include "shared.hh" #include "shared.hh"
using namespace nix; using namespace nix;
using nlohmann::json;
namespace nix {
template<> void toJSON<std::atomic<long>>(std::ostream & str, const std::atomic<long> & n) { str << n; }
template<> void toJSON<std::atomic<uint64_t>>(std::ostream & str, const std::atomic<uint64_t> & n) { str << n; }
template<> void toJSON<double>(std::ostream & str, const double & n) { str << n; }
}
std::string getEnvOrDie(const std::string & key) std::string getEnvOrDie(const std::string & key)
@ -542,181 +535,167 @@ std::shared_ptr<PathLocks> State::acquireGlobalLock()
void State::dumpStatus(Connection & conn) void State::dumpStatus(Connection & conn)
{ {
std::ostringstream out; auto root = json::object();
{ {
JSONObject root(out);
time_t now = time(0); time_t now = time(0);
root.attr("status", "up"); root["status"] = "up";
root.attr("time", time(0)); root["time"] = time(0);
root.attr("uptime", now - startedAt); root["uptime"] = now - startedAt;
root.attr("pid", getpid()); root["pid"] = getpid();
{ {
auto builds_(builds.lock()); auto builds_(builds.lock());
root.attr("nrQueuedBuilds", builds_->size()); root["nrQueuedBuilds"] = builds_->size();
} }
{ {
auto steps_(steps.lock()); auto steps_(steps.lock());
for (auto i = steps_->begin(); i != steps_->end(); ) for (auto i = steps_->begin(); i != steps_->end(); )
if (i->second.lock()) ++i; else i = steps_->erase(i); if (i->second.lock()) ++i; else i = steps_->erase(i);
root.attr("nrUnfinishedSteps", steps_->size()); root["nrUnfinishedSteps"] = steps_->size();
} }
{ {
auto runnable_(runnable.lock()); auto runnable_(runnable.lock());
for (auto i = runnable_->begin(); i != runnable_->end(); ) for (auto i = runnable_->begin(); i != runnable_->end(); )
if (i->lock()) ++i; else i = runnable_->erase(i); if (i->lock()) ++i; else i = runnable_->erase(i);
root.attr("nrRunnableSteps", runnable_->size()); root["nrRunnableSteps"] = runnable_->size();
} }
root.attr("nrActiveSteps", activeSteps_.lock()->size()); root["nrActiveSteps"] = activeSteps_.lock()->size();
root.attr("nrStepsBuilding", nrStepsBuilding); root["nrStepsBuilding"] = nrStepsBuilding.load();
root.attr("nrStepsCopyingTo", nrStepsCopyingTo); root["nrStepsCopyingTo"] = nrStepsCopyingTo.load();
root.attr("nrStepsCopyingFrom", nrStepsCopyingFrom); root["nrStepsCopyingFrom"] = nrStepsCopyingFrom.load();
root.attr("nrStepsWaiting", nrStepsWaiting); root["nrStepsWaiting"] = nrStepsWaiting.load();
root.attr("nrUnsupportedSteps", nrUnsupportedSteps); root["nrUnsupportedSteps"] = nrUnsupportedSteps.load();
root.attr("bytesSent", bytesSent); root["bytesSent"] = bytesSent.load();
root.attr("bytesReceived", bytesReceived); root["bytesReceived"] = bytesReceived.load();
root.attr("nrBuildsRead", nrBuildsRead); root["nrBuildsRead"] = nrBuildsRead.load();
root.attr("buildReadTimeMs", buildReadTimeMs); root["buildReadTimeMs"] = buildReadTimeMs.load();
root.attr("buildReadTimeAvgMs", nrBuildsRead == 0 ? 0.0 : (float) buildReadTimeMs / nrBuildsRead); root["buildReadTimeAvgMs"] = nrBuildsRead == 0 ? 0.0 : (float) buildReadTimeMs / nrBuildsRead;
root.attr("nrBuildsDone", nrBuildsDone); root["nrBuildsDone"] = nrBuildsDone.load();
root.attr("nrStepsStarted", nrStepsStarted); root["nrStepsStarted"] = nrStepsStarted.load();
root.attr("nrStepsDone", nrStepsDone); root["nrStepsDone"] = nrStepsDone.load();
root.attr("nrRetries", nrRetries); root["nrRetries"] = nrRetries.load();
root.attr("maxNrRetries", maxNrRetries); root["maxNrRetries"] = maxNrRetries.load();
if (nrStepsDone) { if (nrStepsDone) {
root.attr("totalStepTime", totalStepTime); root["totalStepTime"] = totalStepTime.load();
root.attr("totalStepBuildTime", totalStepBuildTime); root["totalStepBuildTime"] = totalStepBuildTime.load();
root.attr("avgStepTime", (float) totalStepTime / nrStepsDone); root["avgStepTime"] = (float) totalStepTime / nrStepsDone;
root.attr("avgStepBuildTime", (float) totalStepBuildTime / nrStepsDone); root["avgStepBuildTime"] = (float) totalStepBuildTime / nrStepsDone;
} }
root.attr("nrQueueWakeups", nrQueueWakeups); root["nrQueueWakeups"] = nrQueueWakeups.load();
root.attr("nrDispatcherWakeups", nrDispatcherWakeups); root["nrDispatcherWakeups"] = nrDispatcherWakeups.load();
root.attr("dispatchTimeMs", dispatchTimeMs); root["dispatchTimeMs"] = dispatchTimeMs.load();
root.attr("dispatchTimeAvgMs", nrDispatcherWakeups == 0 ? 0.0 : (float) dispatchTimeMs / nrDispatcherWakeups); root["dispatchTimeAvgMs"] = nrDispatcherWakeups == 0 ? 0.0 : (float) dispatchTimeMs / nrDispatcherWakeups;
root.attr("nrDbConnections", dbPool.count()); root["nrDbConnections"] = dbPool.count();
root.attr("nrActiveDbUpdates", nrActiveDbUpdates); root["nrActiveDbUpdates"] = nrActiveDbUpdates.load();
{ {
auto nested = root.object("machines"); auto nested = root["machines"];
auto machines_(machines.lock()); auto machines_(machines.lock());
for (auto & i : *machines_) { for (auto & i : *machines_) {
auto & m(i.second); auto & m(i.second);
auto & s(m->state); auto & s(m->state);
auto nested2 = nested.object(m->sshName);
nested2.attr("enabled", m->enabled);
{
auto list = nested2.list("systemTypes");
for (auto & s : m->systemTypes)
list.elem(s);
}
{
auto list = nested2.list("supportedFeatures");
for (auto & s : m->supportedFeatures)
list.elem(s);
}
{
auto list = nested2.list("mandatoryFeatures");
for (auto & s : m->mandatoryFeatures)
list.elem(s);
}
nested2.attr("currentJobs", s->currentJobs);
if (s->currentJobs == 0)
nested2.attr("idleSince", s->idleSince);
nested2.attr("nrStepsDone", s->nrStepsDone);
if (m->state->nrStepsDone) {
nested2.attr("totalStepTime", s->totalStepTime);
nested2.attr("totalStepBuildTime", s->totalStepBuildTime);
nested2.attr("avgStepTime", (float) s->totalStepTime / s->nrStepsDone);
nested2.attr("avgStepBuildTime", (float) s->totalStepBuildTime / s->nrStepsDone);
}
auto info(m->state->connectInfo.lock()); auto info(m->state->connectInfo.lock());
nested2.attr("disabledUntil", std::chrono::system_clock::to_time_t(info->disabledUntil));
nested2.attr("lastFailure", std::chrono::system_clock::to_time_t(info->lastFailure));
nested2.attr("consecutiveFailures", info->consecutiveFailures);
auto machine = nested[m->sshName] = {
{"enabled", m->enabled},
{"systemTypes", m->systemTypes},
{"supportedFeatures", m->supportedFeatures},
{"mandatoryFeatures", m->mandatoryFeatures},
{"nrStepsDone", s->nrStepsDone.load()},
{"currentJobs", s->currentJobs.load()},
{"disabledUntil", std::chrono::system_clock::to_time_t(info->disabledUntil)},
{"lastFailure", std::chrono::system_clock::to_time_t(info->lastFailure)},
{"consecutiveFailures", info->consecutiveFailures},
};
if (s->currentJobs == 0)
machine["idleSince"] = s->idleSince.load();
if (m->state->nrStepsDone) {
machine["totalStepTime"] = s->totalStepTime.load();
machine["totalStepBuildTime"] = s->totalStepBuildTime.load();
machine["avgStepTime"] = (float) s->totalStepTime / s->nrStepsDone;
machine["avgStepBuildTime"] = (float) s->totalStepBuildTime / s->nrStepsDone;
}
} }
} }
{ {
auto nested = root.object("jobsets"); auto jobsets_json = root["jobsets"];
auto jobsets_(jobsets.lock()); auto jobsets_(jobsets.lock());
for (auto & jobset : *jobsets_) { for (auto & jobset : *jobsets_) {
auto nested2 = nested.object(jobset.first.first + ":" + jobset.first.second); jobsets_json[jobset.first.first + ":" + jobset.first.second] = {
nested2.attr("shareUsed", jobset.second->shareUsed()); {"shareUsed", jobset.second->shareUsed()},
nested2.attr("seconds", jobset.second->getSeconds()); {"seconds", jobset.second->getSeconds()},
};
} }
} }
{ {
auto nested = root.object("machineTypes"); auto machineTypesJson = root["machineTypes"];
auto machineTypes_(machineTypes.lock()); auto machineTypes_(machineTypes.lock());
for (auto & i : *machineTypes_) { for (auto & i : *machineTypes_) {
auto nested2 = nested.object(i.first); auto machineTypeJson = machineTypesJson[i.first] = {
nested2.attr("runnable", i.second.runnable); {"runnable", i.second.runnable},
nested2.attr("running", i.second.running); {"running", i.second.running},
};
if (i.second.runnable > 0) if (i.second.runnable > 0)
nested2.attr("waitTime", i.second.waitTime.count() + machineTypeJson["waitTime"] = i.second.waitTime.count() +
i.second.runnable * (time(0) - lastDispatcherCheck)); i.second.runnable * (time(0) - lastDispatcherCheck);
if (i.second.running == 0) if (i.second.running == 0)
nested2.attr("lastActive", std::chrono::system_clock::to_time_t(i.second.lastActive)); machineTypeJson["lastActive"] = std::chrono::system_clock::to_time_t(i.second.lastActive);
} }
} }
auto store = getDestStore(); auto store = getDestStore();
auto nested = root.object("store");
auto & stats = store->getStats(); auto & stats = store->getStats();
nested.attr("narInfoRead", stats.narInfoRead); root["store"] = {
nested.attr("narInfoReadAverted", stats.narInfoReadAverted); {"narInfoRead", stats.narInfoRead.load()},
nested.attr("narInfoMissing", stats.narInfoMissing); {"narInfoReadAverted", stats.narInfoReadAverted.load()},
nested.attr("narInfoWrite", stats.narInfoWrite); {"narInfoMissing", stats.narInfoMissing.load()},
nested.attr("narInfoCacheSize", stats.pathInfoCacheSize); {"narInfoWrite", stats.narInfoWrite.load()},
nested.attr("narRead", stats.narRead); {"narInfoCacheSize", stats.pathInfoCacheSize.load()},
nested.attr("narReadBytes", stats.narReadBytes); {"narRead", stats.narRead.load()},
nested.attr("narReadCompressedBytes", stats.narReadCompressedBytes); {"narReadBytes", stats.narReadBytes.load()},
nested.attr("narWrite", stats.narWrite); {"narReadCompressedBytes", stats.narReadCompressedBytes.load()},
nested.attr("narWriteAverted", stats.narWriteAverted); {"narWrite", stats.narWrite.load()},
nested.attr("narWriteBytes", stats.narWriteBytes); {"narWriteAverted", stats.narWriteAverted.load()},
nested.attr("narWriteCompressedBytes", stats.narWriteCompressedBytes); {"narWriteBytes", stats.narWriteBytes.load()},
nested.attr("narWriteCompressionTimeMs", stats.narWriteCompressionTimeMs); {"narWriteCompressedBytes", stats.narWriteCompressedBytes.load()},
nested.attr("narCompressionSavings", {"narWriteCompressionTimeMs", stats.narWriteCompressionTimeMs.load()},
stats.narWriteBytes {"narCompressionSavings",
? 1.0 - (double) stats.narWriteCompressedBytes / stats.narWriteBytes stats.narWriteBytes
: 0.0); ? 1.0 - (double) stats.narWriteCompressedBytes / stats.narWriteBytes
nested.attr("narCompressionSpeed", // MiB/s : 0.0},
{"narCompressionSpeed", // MiB/s
stats.narWriteCompressionTimeMs stats.narWriteCompressionTimeMs
? (double) stats.narWriteBytes / stats.narWriteCompressionTimeMs * 1000.0 / (1024.0 * 1024.0) ? (double) stats.narWriteBytes / stats.narWriteCompressionTimeMs * 1000.0 / (1024.0 * 1024.0)
: 0.0); : 0.0},
};
auto s3Store = dynamic_cast<S3BinaryCacheStore *>(&*store); auto s3Store = dynamic_cast<S3BinaryCacheStore *>(&*store);
if (s3Store) { if (s3Store) {
auto nested2 = nested.object("s3");
auto & s3Stats = s3Store->getS3Stats(); auto & s3Stats = s3Store->getS3Stats();
nested2.attr("put", s3Stats.put); auto jsonS3 = root["s3"] = {
nested2.attr("putBytes", s3Stats.putBytes); {"put", s3Stats.put.load()},
nested2.attr("putTimeMs", s3Stats.putTimeMs); {"putBytes", s3Stats.putBytes.load()},
nested2.attr("putSpeed", {"putTimeMs", s3Stats.putTimeMs.load()},
s3Stats.putTimeMs {"putSpeed",
? (double) s3Stats.putBytes / s3Stats.putTimeMs * 1000.0 / (1024.0 * 1024.0) s3Stats.putTimeMs
: 0.0); ? (double) s3Stats.putBytes / s3Stats.putTimeMs * 1000.0 / (1024.0 * 1024.0)
nested2.attr("get", s3Stats.get); : 0.0},
nested2.attr("getBytes", s3Stats.getBytes); {"get", s3Stats.get.load()},
nested2.attr("getTimeMs", s3Stats.getTimeMs); {"getBytes", s3Stats.getBytes.load()},
nested2.attr("getSpeed", {"getTimeMs", s3Stats.getTimeMs.load()},
s3Stats.getTimeMs {"getSpeed",
? (double) s3Stats.getBytes / s3Stats.getTimeMs * 1000.0 / (1024.0 * 1024.0) s3Stats.getTimeMs
: 0.0); ? (double) s3Stats.getBytes / s3Stats.getTimeMs * 1000.0 / (1024.0 * 1024.0)
nested2.attr("head", s3Stats.head); : 0.0},
nested2.attr("costDollarApprox", {"head", s3Stats.head.load()},
(s3Stats.get + s3Stats.head) / 10000.0 * 0.004 {"costDollarApprox",
+ s3Stats.put / 1000.0 * 0.005 + (s3Stats.get + s3Stats.head) / 10000.0 * 0.004
+ s3Stats.getBytes / (1024.0 * 1024.0 * 1024.0) * 0.09); + s3Stats.put / 1000.0 * 0.005 +
+ s3Stats.getBytes / (1024.0 * 1024.0 * 1024.0) * 0.09},
};
} }
} }
@ -725,7 +704,7 @@ void State::dumpStatus(Connection & conn)
pqxx::work txn(conn); pqxx::work txn(conn);
// FIXME: use PostgreSQL 9.5 upsert. // FIXME: use PostgreSQL 9.5 upsert.
txn.exec("delete from SystemStatus where what = 'queue-runner'"); txn.exec("delete from SystemStatus where what = 'queue-runner'");
txn.exec_params0("insert into SystemStatus values ('queue-runner', $1)", out.str()); txn.exec_params0("insert into SystemStatus values ('queue-runner', $1)", root.dump());
txn.exec("notify status_dumped"); txn.exec("notify status_dumped");
txn.commit(); txn.commit();
} }