From 96e36201ebb7748d64f895947d198b370968edd0 Mon Sep 17 00:00:00 2001 From: Linus Heckemann Date: Tue, 29 Nov 2022 18:13:15 +0100 Subject: [PATCH] hydra-queue-runner: adapt to nlohmann::json --- src/hydra-queue-runner/hydra-queue-runner.cc | 249 +++++++++---------- 1 file changed, 114 insertions(+), 135 deletions(-) diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index 723bf223..b16fd770 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -8,6 +8,8 @@ #include +#include + #include "state.hh" #include "hydra-build-result.hh" #include "store-api.hh" @@ -15,20 +17,11 @@ #include "globals.hh" #include "hydra-config.hh" -#include "json.hh" #include "s3-binary-cache-store.hh" #include "shared.hh" using namespace nix; - - -namespace nix { - -template<> void toJSON>(std::ostream & str, const std::atomic & n) { str << n; } -template<> void toJSON>(std::ostream & str, const std::atomic & n) { str << n; } -template<> void toJSON(std::ostream & str, const double & n) { str << n; } - -} +using nlohmann::json; std::string getEnvOrDie(const std::string & key) @@ -542,181 +535,167 @@ std::shared_ptr State::acquireGlobalLock() void State::dumpStatus(Connection & conn) { - std::ostringstream out; - + auto root = json::object(); { - JSONObject root(out); time_t now = time(0); - root.attr("status", "up"); - root.attr("time", time(0)); - root.attr("uptime", now - startedAt); - root.attr("pid", getpid()); + root["status"] = "up"; + root["time"] = time(0); + root["uptime"] = now - startedAt; + root["pid"] = getpid(); { auto builds_(builds.lock()); - root.attr("nrQueuedBuilds", builds_->size()); + root["nrQueuedBuilds"] = builds_->size(); } { auto steps_(steps.lock()); for (auto i = steps_->begin(); i != steps_->end(); ) if (i->second.lock()) ++i; else i = steps_->erase(i); - root.attr("nrUnfinishedSteps", steps_->size()); + root["nrUnfinishedSteps"] = steps_->size(); } { auto runnable_(runnable.lock()); for (auto i = runnable_->begin(); i != runnable_->end(); ) if (i->lock()) ++i; else i = runnable_->erase(i); - root.attr("nrRunnableSteps", runnable_->size()); + root["nrRunnableSteps"] = runnable_->size(); } - root.attr("nrActiveSteps", activeSteps_.lock()->size()); - root.attr("nrStepsBuilding", nrStepsBuilding); - root.attr("nrStepsCopyingTo", nrStepsCopyingTo); - root.attr("nrStepsCopyingFrom", nrStepsCopyingFrom); - root.attr("nrStepsWaiting", nrStepsWaiting); - root.attr("nrUnsupportedSteps", nrUnsupportedSteps); - root.attr("bytesSent", bytesSent); - root.attr("bytesReceived", bytesReceived); - root.attr("nrBuildsRead", nrBuildsRead); - root.attr("buildReadTimeMs", buildReadTimeMs); - root.attr("buildReadTimeAvgMs", nrBuildsRead == 0 ? 0.0 : (float) buildReadTimeMs / nrBuildsRead); - root.attr("nrBuildsDone", nrBuildsDone); - root.attr("nrStepsStarted", nrStepsStarted); - root.attr("nrStepsDone", nrStepsDone); - root.attr("nrRetries", nrRetries); - root.attr("maxNrRetries", maxNrRetries); + root["nrActiveSteps"] = activeSteps_.lock()->size(); + root["nrStepsBuilding"] = nrStepsBuilding.load(); + root["nrStepsCopyingTo"] = nrStepsCopyingTo.load(); + root["nrStepsCopyingFrom"] = nrStepsCopyingFrom.load(); + root["nrStepsWaiting"] = nrStepsWaiting.load(); + root["nrUnsupportedSteps"] = nrUnsupportedSteps.load(); + root["bytesSent"] = bytesSent.load(); + root["bytesReceived"] = bytesReceived.load(); + root["nrBuildsRead"] = nrBuildsRead.load(); + root["buildReadTimeMs"] = buildReadTimeMs.load(); + root["buildReadTimeAvgMs"] = nrBuildsRead == 0 ? 0.0 : (float) buildReadTimeMs / nrBuildsRead; + root["nrBuildsDone"] = nrBuildsDone.load(); + root["nrStepsStarted"] = nrStepsStarted.load(); + root["nrStepsDone"] = nrStepsDone.load(); + root["nrRetries"] = nrRetries.load(); + root["maxNrRetries"] = maxNrRetries.load(); if (nrStepsDone) { - root.attr("totalStepTime", totalStepTime); - root.attr("totalStepBuildTime", totalStepBuildTime); - root.attr("avgStepTime", (float) totalStepTime / nrStepsDone); - root.attr("avgStepBuildTime", (float) totalStepBuildTime / nrStepsDone); + root["totalStepTime"] = totalStepTime.load(); + root["totalStepBuildTime"] = totalStepBuildTime.load(); + root["avgStepTime"] = (float) totalStepTime / nrStepsDone; + root["avgStepBuildTime"] = (float) totalStepBuildTime / nrStepsDone; } - root.attr("nrQueueWakeups", nrQueueWakeups); - root.attr("nrDispatcherWakeups", nrDispatcherWakeups); - root.attr("dispatchTimeMs", dispatchTimeMs); - root.attr("dispatchTimeAvgMs", nrDispatcherWakeups == 0 ? 0.0 : (float) dispatchTimeMs / nrDispatcherWakeups); - root.attr("nrDbConnections", dbPool.count()); - root.attr("nrActiveDbUpdates", nrActiveDbUpdates); + root["nrQueueWakeups"] = nrQueueWakeups.load(); + root["nrDispatcherWakeups"] = nrDispatcherWakeups.load(); + root["dispatchTimeMs"] = dispatchTimeMs.load(); + root["dispatchTimeAvgMs"] = nrDispatcherWakeups == 0 ? 0.0 : (float) dispatchTimeMs / nrDispatcherWakeups; + root["nrDbConnections"] = dbPool.count(); + root["nrActiveDbUpdates"] = nrActiveDbUpdates.load(); { - auto nested = root.object("machines"); + auto nested = root["machines"]; auto machines_(machines.lock()); for (auto & i : *machines_) { auto & m(i.second); auto & s(m->state); - auto nested2 = nested.object(m->sshName); - nested2.attr("enabled", m->enabled); - - { - auto list = nested2.list("systemTypes"); - for (auto & s : m->systemTypes) - list.elem(s); - } - - { - auto list = nested2.list("supportedFeatures"); - for (auto & s : m->supportedFeatures) - list.elem(s); - } - - { - auto list = nested2.list("mandatoryFeatures"); - for (auto & s : m->mandatoryFeatures) - list.elem(s); - } - - nested2.attr("currentJobs", s->currentJobs); - if (s->currentJobs == 0) - nested2.attr("idleSince", s->idleSince); - nested2.attr("nrStepsDone", s->nrStepsDone); - if (m->state->nrStepsDone) { - nested2.attr("totalStepTime", s->totalStepTime); - nested2.attr("totalStepBuildTime", s->totalStepBuildTime); - nested2.attr("avgStepTime", (float) s->totalStepTime / s->nrStepsDone); - nested2.attr("avgStepBuildTime", (float) s->totalStepBuildTime / s->nrStepsDone); - } - auto info(m->state->connectInfo.lock()); - nested2.attr("disabledUntil", std::chrono::system_clock::to_time_t(info->disabledUntil)); - nested2.attr("lastFailure", std::chrono::system_clock::to_time_t(info->lastFailure)); - nested2.attr("consecutiveFailures", info->consecutiveFailures); + auto machine = nested[m->sshName] = { + {"enabled", m->enabled}, + {"systemTypes", m->systemTypes}, + {"supportedFeatures", m->supportedFeatures}, + {"mandatoryFeatures", m->mandatoryFeatures}, + {"nrStepsDone", s->nrStepsDone.load()}, + {"currentJobs", s->currentJobs.load()}, + {"disabledUntil", std::chrono::system_clock::to_time_t(info->disabledUntil)}, + {"lastFailure", std::chrono::system_clock::to_time_t(info->lastFailure)}, + {"consecutiveFailures", info->consecutiveFailures}, + }; + + if (s->currentJobs == 0) + machine["idleSince"] = s->idleSince.load(); + if (m->state->nrStepsDone) { + machine["totalStepTime"] = s->totalStepTime.load(); + machine["totalStepBuildTime"] = s->totalStepBuildTime.load(); + machine["avgStepTime"] = (float) s->totalStepTime / s->nrStepsDone; + machine["avgStepBuildTime"] = (float) s->totalStepBuildTime / s->nrStepsDone; + } } } { - auto nested = root.object("jobsets"); + auto jobsets_json = root["jobsets"]; auto jobsets_(jobsets.lock()); for (auto & jobset : *jobsets_) { - auto nested2 = nested.object(jobset.first.first + ":" + jobset.first.second); - nested2.attr("shareUsed", jobset.second->shareUsed()); - nested2.attr("seconds", jobset.second->getSeconds()); + jobsets_json[jobset.first.first + ":" + jobset.first.second] = { + {"shareUsed", jobset.second->shareUsed()}, + {"seconds", jobset.second->getSeconds()}, + }; } } { - auto nested = root.object("machineTypes"); + auto machineTypesJson = root["machineTypes"]; auto machineTypes_(machineTypes.lock()); for (auto & i : *machineTypes_) { - auto nested2 = nested.object(i.first); - nested2.attr("runnable", i.second.runnable); - nested2.attr("running", i.second.running); + auto machineTypeJson = machineTypesJson[i.first] = { + {"runnable", i.second.runnable}, + {"running", i.second.running}, + }; if (i.second.runnable > 0) - nested2.attr("waitTime", i.second.waitTime.count() + - i.second.runnable * (time(0) - lastDispatcherCheck)); + machineTypeJson["waitTime"] = i.second.waitTime.count() + + i.second.runnable * (time(0) - lastDispatcherCheck); if (i.second.running == 0) - nested2.attr("lastActive", std::chrono::system_clock::to_time_t(i.second.lastActive)); + machineTypeJson["lastActive"] = std::chrono::system_clock::to_time_t(i.second.lastActive); } } auto store = getDestStore(); - auto nested = root.object("store"); - auto & stats = store->getStats(); - nested.attr("narInfoRead", stats.narInfoRead); - nested.attr("narInfoReadAverted", stats.narInfoReadAverted); - nested.attr("narInfoMissing", stats.narInfoMissing); - nested.attr("narInfoWrite", stats.narInfoWrite); - nested.attr("narInfoCacheSize", stats.pathInfoCacheSize); - nested.attr("narRead", stats.narRead); - nested.attr("narReadBytes", stats.narReadBytes); - nested.attr("narReadCompressedBytes", stats.narReadCompressedBytes); - nested.attr("narWrite", stats.narWrite); - nested.attr("narWriteAverted", stats.narWriteAverted); - nested.attr("narWriteBytes", stats.narWriteBytes); - nested.attr("narWriteCompressedBytes", stats.narWriteCompressedBytes); - nested.attr("narWriteCompressionTimeMs", stats.narWriteCompressionTimeMs); - nested.attr("narCompressionSavings", - stats.narWriteBytes - ? 1.0 - (double) stats.narWriteCompressedBytes / stats.narWriteBytes - : 0.0); - nested.attr("narCompressionSpeed", // MiB/s + root["store"] = { + {"narInfoRead", stats.narInfoRead.load()}, + {"narInfoReadAverted", stats.narInfoReadAverted.load()}, + {"narInfoMissing", stats.narInfoMissing.load()}, + {"narInfoWrite", stats.narInfoWrite.load()}, + {"narInfoCacheSize", stats.pathInfoCacheSize.load()}, + {"narRead", stats.narRead.load()}, + {"narReadBytes", stats.narReadBytes.load()}, + {"narReadCompressedBytes", stats.narReadCompressedBytes.load()}, + {"narWrite", stats.narWrite.load()}, + {"narWriteAverted", stats.narWriteAverted.load()}, + {"narWriteBytes", stats.narWriteBytes.load()}, + {"narWriteCompressedBytes", stats.narWriteCompressedBytes.load()}, + {"narWriteCompressionTimeMs", stats.narWriteCompressionTimeMs.load()}, + {"narCompressionSavings", + stats.narWriteBytes + ? 1.0 - (double) stats.narWriteCompressedBytes / stats.narWriteBytes + : 0.0}, + {"narCompressionSpeed", // MiB/s stats.narWriteCompressionTimeMs ? (double) stats.narWriteBytes / stats.narWriteCompressionTimeMs * 1000.0 / (1024.0 * 1024.0) - : 0.0); + : 0.0}, + }; auto s3Store = dynamic_cast(&*store); if (s3Store) { - auto nested2 = nested.object("s3"); auto & s3Stats = s3Store->getS3Stats(); - nested2.attr("put", s3Stats.put); - nested2.attr("putBytes", s3Stats.putBytes); - nested2.attr("putTimeMs", s3Stats.putTimeMs); - nested2.attr("putSpeed", - s3Stats.putTimeMs - ? (double) s3Stats.putBytes / s3Stats.putTimeMs * 1000.0 / (1024.0 * 1024.0) - : 0.0); - nested2.attr("get", s3Stats.get); - nested2.attr("getBytes", s3Stats.getBytes); - nested2.attr("getTimeMs", s3Stats.getTimeMs); - nested2.attr("getSpeed", - s3Stats.getTimeMs - ? (double) s3Stats.getBytes / s3Stats.getTimeMs * 1000.0 / (1024.0 * 1024.0) - : 0.0); - nested2.attr("head", s3Stats.head); - nested2.attr("costDollarApprox", - (s3Stats.get + s3Stats.head) / 10000.0 * 0.004 - + s3Stats.put / 1000.0 * 0.005 + - + s3Stats.getBytes / (1024.0 * 1024.0 * 1024.0) * 0.09); + auto jsonS3 = root["s3"] = { + {"put", s3Stats.put.load()}, + {"putBytes", s3Stats.putBytes.load()}, + {"putTimeMs", s3Stats.putTimeMs.load()}, + {"putSpeed", + s3Stats.putTimeMs + ? (double) s3Stats.putBytes / s3Stats.putTimeMs * 1000.0 / (1024.0 * 1024.0) + : 0.0}, + {"get", s3Stats.get.load()}, + {"getBytes", s3Stats.getBytes.load()}, + {"getTimeMs", s3Stats.getTimeMs.load()}, + {"getSpeed", + s3Stats.getTimeMs + ? (double) s3Stats.getBytes / s3Stats.getTimeMs * 1000.0 / (1024.0 * 1024.0) + : 0.0}, + {"head", s3Stats.head.load()}, + {"costDollarApprox", + (s3Stats.get + s3Stats.head) / 10000.0 * 0.004 + + s3Stats.put / 1000.0 * 0.005 + + + s3Stats.getBytes / (1024.0 * 1024.0 * 1024.0) * 0.09}, + }; } } @@ -725,7 +704,7 @@ void State::dumpStatus(Connection & conn) pqxx::work txn(conn); // FIXME: use PostgreSQL 9.5 upsert. txn.exec("delete from SystemStatus where what = 'queue-runner'"); - txn.exec_params0("insert into SystemStatus values ('queue-runner', $1)", out.str()); + txn.exec_params0("insert into SystemStatus values ('queue-runner', $1)", root.dump()); txn.exec("notify status_dumped"); txn.commit(); }