diff --git a/flake.lock b/flake.lock index b41b843a..08fd86ad 100644 --- a/flake.lock +++ b/flake.lock @@ -23,32 +23,32 @@ "nixpkgs-regression": "nixpkgs-regression" }, "locked": { - "lastModified": 1661606874, - "narHash": "sha256-9+rpYzI+SmxJn+EbYxjGv68Ucp22bdFUSy/4LkHkkDQ=", - "owner": "NixOS", + "lastModified": 1677045134, + "narHash": "sha256-jUc2ccTR8f6MGY2pUKgujm+lxSPNGm/ZAP+toX+nMNc=", + "owner": "nixos", "repo": "nix", - "rev": "11e45768b34fdafdcf019ddbd337afa16127ff0f", + "rev": "4acc684ef7b3117c6d6ac12837398a0008a53d85", "type": "github" }, "original": { - "owner": "NixOS", - "ref": "2.11.0", + "owner": "nixos", + "ref": "2.13.3", "repo": "nix", "type": "github" } }, "nixpkgs": { "locked": { - "lastModified": 1657693803, - "narHash": "sha256-G++2CJ9u0E7NNTAi9n5G8TdDmGJXcIjkJ3NF8cetQB8=", + "lastModified": 1670461440, + "narHash": "sha256-jy1LB8HOMKGJEGXgzFRLDU1CBGL0/LlkolgnqIsF0D8=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "365e1b3a859281cf11b94f87231adeabbdd878a2", + "rev": "04a75b2eecc0acf6239acf9dd04485ff8d14f425", "type": "github" }, "original": { "owner": "NixOS", - "ref": "nixos-22.05-small", + "ref": "nixos-22.11-small", "repo": "nixpkgs", "type": "github" } diff --git a/flake.nix b/flake.nix index cd9f094d..beeb90c1 100644 --- a/flake.nix +++ b/flake.nix @@ -2,7 +2,7 @@ description = "A Nix-based continuous build system"; inputs.nixpkgs.follows = "nix/nixpkgs"; - inputs.nix.url = "github:NixOS/nix/2.11.0"; + inputs.nix.url = "github:nixos/nix/2.13.3"; outputs = { self, nixpkgs, nix }: let @@ -272,6 +272,7 @@ tests.install = forEachSystem (system: with import (nixpkgs + "/nixos/lib/testing-python.nix") { inherit system; }; simpleTest { + name = "hydra-install"; nodes.machine = hydraServer; testScript = '' @@ -279,7 +280,7 @@ machine.wait_for_job("hydra-server") machine.wait_for_job("hydra-evaluator") machine.wait_for_job("hydra-queue-runner") - machine.wait_for_open_port("3000") + machine.wait_for_open_port(3000) machine.succeed("curl --fail http://localhost:3000/") ''; }); @@ -288,6 +289,7 @@ let pkgs = pkgsBySystem.${system}; in with import (nixpkgs + "/nixos/lib/testing-python.nix") { inherit system; }; simpleTest { + name = "hydra-notifications"; nodes.machine = { pkgs, ... }: { imports = [ hydraServer ]; services.hydra-dev.extraConfig = '' @@ -315,7 +317,7 @@ # Wait until InfluxDB can receive web requests machine.wait_for_job("influxdb") - machine.wait_for_open_port("8086") + machine.wait_for_open_port(8086) # Create an InfluxDB database where hydra will write to machine.succeed( @@ -325,7 +327,7 @@ # Wait until hydra-server can receive HTTP requests machine.wait_for_job("hydra-server") - machine.wait_for_open_port("3000") + machine.wait_for_open_port(3000) # Setup the project and jobset machine.succeed( @@ -346,6 +348,7 @@ let pkgs = pkgsBySystem.${system}; in with import (nixpkgs + "/nixos/lib/testing-python.nix") { inherit system; }; makeTest { + name = "hydra-gitea"; nodes.machine = { pkgs, ... }: { imports = [ hydraServer ]; services.hydra-dev.extraConfig = '' diff --git a/src/hydra-eval-jobs/hydra-eval-jobs.cc b/src/hydra-eval-jobs/hydra-eval-jobs.cc index 18d39620..de7ae7ba 100644 --- a/src/hydra-eval-jobs/hydra-eval-jobs.cc +++ b/src/hydra-eval-jobs/hydra-eval-jobs.cc @@ -129,7 +129,7 @@ static void worker( LockFlags { .updateLockFile = false, .useRegistries = false, - .allowMutable = false, + .allowUnlocked = false, }); callFlake(state, lockedFlake, *vFlake); diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index 723bf223..b84681d5 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -8,6 +8,8 @@ #include +#include + #include "state.hh" #include "hydra-build-result.hh" #include "store-api.hh" @@ -15,20 +17,11 @@ #include "globals.hh" #include "hydra-config.hh" -#include "json.hh" #include "s3-binary-cache-store.hh" #include "shared.hh" using namespace nix; - - -namespace nix { - -template<> void toJSON>(std::ostream & str, const std::atomic & n) { str << n; } -template<> void toJSON>(std::ostream & str, const std::atomic & n) { str << n; } -template<> void toJSON(std::ostream & str, const double & n) { str << n; } - -} +using nlohmann::json; std::string getEnvOrDie(const std::string & key) @@ -542,181 +535,166 @@ std::shared_ptr State::acquireGlobalLock() void State::dumpStatus(Connection & conn) { - std::ostringstream out; + time_t now = time(0); + json statusJson = { + {"status", "up"}, + {"time", time(0)}, + {"uptime", now - startedAt}, + {"pid", getpid()}, + {"nrQueuedBuilds", builds.lock()->size()}, + {"nrActiveSteps", activeSteps_.lock()->size()}, + {"nrStepsBuilding", nrStepsBuilding.load()}, + {"nrStepsCopyingTo", nrStepsCopyingTo.load()}, + {"nrStepsCopyingFrom", nrStepsCopyingFrom.load()}, + {"nrStepsWaiting", nrStepsWaiting.load()}, + {"nrUnsupportedSteps", nrUnsupportedSteps.load()}, + {"bytesSent", bytesSent.load()}, + {"bytesReceived", bytesReceived.load()}, + {"nrBuildsRead", nrBuildsRead.load()}, + {"buildReadTimeMs", buildReadTimeMs.load()}, + {"buildReadTimeAvgMs", nrBuildsRead == 0 ? 0.0 : (float) buildReadTimeMs / nrBuildsRead}, + {"nrBuildsDone", nrBuildsDone.load()}, + {"nrStepsStarted", nrStepsStarted.load()}, + {"nrStepsDone", nrStepsDone.load()}, + {"nrRetries", nrRetries.load()}, + {"maxNrRetries", maxNrRetries.load()}, + {"nrQueueWakeups", nrQueueWakeups.load()}, + {"nrDispatcherWakeups", nrDispatcherWakeups.load()}, + {"dispatchTimeMs", dispatchTimeMs.load()}, + {"dispatchTimeAvgMs", nrDispatcherWakeups == 0 ? 0.0 : (float) dispatchTimeMs / nrDispatcherWakeups}, + {"nrDbConnections", dbPool.count()}, + {"nrActiveDbUpdates", nrActiveDbUpdates.load()}, + }; { - JSONObject root(out); - time_t now = time(0); - root.attr("status", "up"); - root.attr("time", time(0)); - root.attr("uptime", now - startedAt); - root.attr("pid", getpid()); - { - auto builds_(builds.lock()); - root.attr("nrQueuedBuilds", builds_->size()); - } { auto steps_(steps.lock()); for (auto i = steps_->begin(); i != steps_->end(); ) if (i->second.lock()) ++i; else i = steps_->erase(i); - root.attr("nrUnfinishedSteps", steps_->size()); + statusJson["nrUnfinishedSteps"] = steps_->size(); } { auto runnable_(runnable.lock()); for (auto i = runnable_->begin(); i != runnable_->end(); ) if (i->lock()) ++i; else i = runnable_->erase(i); - root.attr("nrRunnableSteps", runnable_->size()); + statusJson["nrRunnableSteps"] = runnable_->size(); } - root.attr("nrActiveSteps", activeSteps_.lock()->size()); - root.attr("nrStepsBuilding", nrStepsBuilding); - root.attr("nrStepsCopyingTo", nrStepsCopyingTo); - root.attr("nrStepsCopyingFrom", nrStepsCopyingFrom); - root.attr("nrStepsWaiting", nrStepsWaiting); - root.attr("nrUnsupportedSteps", nrUnsupportedSteps); - root.attr("bytesSent", bytesSent); - root.attr("bytesReceived", bytesReceived); - root.attr("nrBuildsRead", nrBuildsRead); - root.attr("buildReadTimeMs", buildReadTimeMs); - root.attr("buildReadTimeAvgMs", nrBuildsRead == 0 ? 0.0 : (float) buildReadTimeMs / nrBuildsRead); - root.attr("nrBuildsDone", nrBuildsDone); - root.attr("nrStepsStarted", nrStepsStarted); - root.attr("nrStepsDone", nrStepsDone); - root.attr("nrRetries", nrRetries); - root.attr("maxNrRetries", maxNrRetries); if (nrStepsDone) { - root.attr("totalStepTime", totalStepTime); - root.attr("totalStepBuildTime", totalStepBuildTime); - root.attr("avgStepTime", (float) totalStepTime / nrStepsDone); - root.attr("avgStepBuildTime", (float) totalStepBuildTime / nrStepsDone); + statusJson["totalStepTime"] = totalStepTime.load(); + statusJson["totalStepBuildTime"] = totalStepBuildTime.load(); + statusJson["avgStepTime"] = (float) totalStepTime / nrStepsDone; + statusJson["avgStepBuildTime"] = (float) totalStepBuildTime / nrStepsDone; } - root.attr("nrQueueWakeups", nrQueueWakeups); - root.attr("nrDispatcherWakeups", nrDispatcherWakeups); - root.attr("dispatchTimeMs", dispatchTimeMs); - root.attr("dispatchTimeAvgMs", nrDispatcherWakeups == 0 ? 0.0 : (float) dispatchTimeMs / nrDispatcherWakeups); - root.attr("nrDbConnections", dbPool.count()); - root.attr("nrActiveDbUpdates", nrActiveDbUpdates); { - auto nested = root.object("machines"); auto machines_(machines.lock()); for (auto & i : *machines_) { auto & m(i.second); auto & s(m->state); - auto nested2 = nested.object(m->sshName); - nested2.attr("enabled", m->enabled); - - { - auto list = nested2.list("systemTypes"); - for (auto & s : m->systemTypes) - list.elem(s); - } - - { - auto list = nested2.list("supportedFeatures"); - for (auto & s : m->supportedFeatures) - list.elem(s); - } - - { - auto list = nested2.list("mandatoryFeatures"); - for (auto & s : m->mandatoryFeatures) - list.elem(s); - } - - nested2.attr("currentJobs", s->currentJobs); - if (s->currentJobs == 0) - nested2.attr("idleSince", s->idleSince); - nested2.attr("nrStepsDone", s->nrStepsDone); - if (m->state->nrStepsDone) { - nested2.attr("totalStepTime", s->totalStepTime); - nested2.attr("totalStepBuildTime", s->totalStepBuildTime); - nested2.attr("avgStepTime", (float) s->totalStepTime / s->nrStepsDone); - nested2.attr("avgStepBuildTime", (float) s->totalStepBuildTime / s->nrStepsDone); - } - auto info(m->state->connectInfo.lock()); - nested2.attr("disabledUntil", std::chrono::system_clock::to_time_t(info->disabledUntil)); - nested2.attr("lastFailure", std::chrono::system_clock::to_time_t(info->lastFailure)); - nested2.attr("consecutiveFailures", info->consecutiveFailures); + json machine = { + {"enabled", m->enabled}, + {"systemTypes", m->systemTypes}, + {"supportedFeatures", m->supportedFeatures}, + {"mandatoryFeatures", m->mandatoryFeatures}, + {"nrStepsDone", s->nrStepsDone.load()}, + {"currentJobs", s->currentJobs.load()}, + {"disabledUntil", std::chrono::system_clock::to_time_t(info->disabledUntil)}, + {"lastFailure", std::chrono::system_clock::to_time_t(info->lastFailure)}, + {"consecutiveFailures", info->consecutiveFailures}, + }; + + if (s->currentJobs == 0) + machine["idleSince"] = s->idleSince.load(); + if (m->state->nrStepsDone) { + machine["totalStepTime"] = s->totalStepTime.load(); + machine["totalStepBuildTime"] = s->totalStepBuildTime.load(); + machine["avgStepTime"] = (float) s->totalStepTime / s->nrStepsDone; + machine["avgStepBuildTime"] = (float) s->totalStepBuildTime / s->nrStepsDone; + } + statusJson["machines"][m->sshName] = machine; } } { - auto nested = root.object("jobsets"); + auto jobsets_json = statusJson["jobsets"] = json::object(); auto jobsets_(jobsets.lock()); for (auto & jobset : *jobsets_) { - auto nested2 = nested.object(jobset.first.first + ":" + jobset.first.second); - nested2.attr("shareUsed", jobset.second->shareUsed()); - nested2.attr("seconds", jobset.second->getSeconds()); + jobsets_json[jobset.first.first + ":" + jobset.first.second] = { + {"shareUsed", jobset.second->shareUsed()}, + {"seconds", jobset.second->getSeconds()}, + }; } } { - auto nested = root.object("machineTypes"); + auto machineTypesJson = statusJson["machineTypes"] = json::object(); auto machineTypes_(machineTypes.lock()); for (auto & i : *machineTypes_) { - auto nested2 = nested.object(i.first); - nested2.attr("runnable", i.second.runnable); - nested2.attr("running", i.second.running); + auto machineTypeJson = machineTypesJson[i.first] = { + {"runnable", i.second.runnable}, + {"running", i.second.running}, + }; if (i.second.runnable > 0) - nested2.attr("waitTime", i.second.waitTime.count() + - i.second.runnable * (time(0) - lastDispatcherCheck)); + machineTypeJson["waitTime"] = i.second.waitTime.count() + + i.second.runnable * (time(0) - lastDispatcherCheck); if (i.second.running == 0) - nested2.attr("lastActive", std::chrono::system_clock::to_time_t(i.second.lastActive)); + machineTypeJson["lastActive"] = std::chrono::system_clock::to_time_t(i.second.lastActive); } } auto store = getDestStore(); - auto nested = root.object("store"); - auto & stats = store->getStats(); - nested.attr("narInfoRead", stats.narInfoRead); - nested.attr("narInfoReadAverted", stats.narInfoReadAverted); - nested.attr("narInfoMissing", stats.narInfoMissing); - nested.attr("narInfoWrite", stats.narInfoWrite); - nested.attr("narInfoCacheSize", stats.pathInfoCacheSize); - nested.attr("narRead", stats.narRead); - nested.attr("narReadBytes", stats.narReadBytes); - nested.attr("narReadCompressedBytes", stats.narReadCompressedBytes); - nested.attr("narWrite", stats.narWrite); - nested.attr("narWriteAverted", stats.narWriteAverted); - nested.attr("narWriteBytes", stats.narWriteBytes); - nested.attr("narWriteCompressedBytes", stats.narWriteCompressedBytes); - nested.attr("narWriteCompressionTimeMs", stats.narWriteCompressionTimeMs); - nested.attr("narCompressionSavings", - stats.narWriteBytes - ? 1.0 - (double) stats.narWriteCompressedBytes / stats.narWriteBytes - : 0.0); - nested.attr("narCompressionSpeed", // MiB/s + statusJson["store"] = { + {"narInfoRead", stats.narInfoRead.load()}, + {"narInfoReadAverted", stats.narInfoReadAverted.load()}, + {"narInfoMissing", stats.narInfoMissing.load()}, + {"narInfoWrite", stats.narInfoWrite.load()}, + {"narInfoCacheSize", stats.pathInfoCacheSize.load()}, + {"narRead", stats.narRead.load()}, + {"narReadBytes", stats.narReadBytes.load()}, + {"narReadCompressedBytes", stats.narReadCompressedBytes.load()}, + {"narWrite", stats.narWrite.load()}, + {"narWriteAverted", stats.narWriteAverted.load()}, + {"narWriteBytes", stats.narWriteBytes.load()}, + {"narWriteCompressedBytes", stats.narWriteCompressedBytes.load()}, + {"narWriteCompressionTimeMs", stats.narWriteCompressionTimeMs.load()}, + {"narCompressionSavings", + stats.narWriteBytes + ? 1.0 - (double) stats.narWriteCompressedBytes / stats.narWriteBytes + : 0.0}, + {"narCompressionSpeed", // MiB/s stats.narWriteCompressionTimeMs ? (double) stats.narWriteBytes / stats.narWriteCompressionTimeMs * 1000.0 / (1024.0 * 1024.0) - : 0.0); + : 0.0}, + }; auto s3Store = dynamic_cast(&*store); if (s3Store) { - auto nested2 = nested.object("s3"); auto & s3Stats = s3Store->getS3Stats(); - nested2.attr("put", s3Stats.put); - nested2.attr("putBytes", s3Stats.putBytes); - nested2.attr("putTimeMs", s3Stats.putTimeMs); - nested2.attr("putSpeed", - s3Stats.putTimeMs - ? (double) s3Stats.putBytes / s3Stats.putTimeMs * 1000.0 / (1024.0 * 1024.0) - : 0.0); - nested2.attr("get", s3Stats.get); - nested2.attr("getBytes", s3Stats.getBytes); - nested2.attr("getTimeMs", s3Stats.getTimeMs); - nested2.attr("getSpeed", - s3Stats.getTimeMs - ? (double) s3Stats.getBytes / s3Stats.getTimeMs * 1000.0 / (1024.0 * 1024.0) - : 0.0); - nested2.attr("head", s3Stats.head); - nested2.attr("costDollarApprox", - (s3Stats.get + s3Stats.head) / 10000.0 * 0.004 - + s3Stats.put / 1000.0 * 0.005 + - + s3Stats.getBytes / (1024.0 * 1024.0 * 1024.0) * 0.09); + auto jsonS3 = statusJson["s3"] = { + {"put", s3Stats.put.load()}, + {"putBytes", s3Stats.putBytes.load()}, + {"putTimeMs", s3Stats.putTimeMs.load()}, + {"putSpeed", + s3Stats.putTimeMs + ? (double) s3Stats.putBytes / s3Stats.putTimeMs * 1000.0 / (1024.0 * 1024.0) + : 0.0}, + {"get", s3Stats.get.load()}, + {"getBytes", s3Stats.getBytes.load()}, + {"getTimeMs", s3Stats.getTimeMs.load()}, + {"getSpeed", + s3Stats.getTimeMs + ? (double) s3Stats.getBytes / s3Stats.getTimeMs * 1000.0 / (1024.0 * 1024.0) + : 0.0}, + {"head", s3Stats.head.load()}, + {"costDollarApprox", + (s3Stats.get + s3Stats.head) / 10000.0 * 0.004 + + s3Stats.put / 1000.0 * 0.005 + + + s3Stats.getBytes / (1024.0 * 1024.0 * 1024.0) * 0.09}, + }; } } @@ -725,7 +703,7 @@ void State::dumpStatus(Connection & conn) pqxx::work txn(conn); // FIXME: use PostgreSQL 9.5 upsert. txn.exec("delete from SystemStatus where what = 'queue-runner'"); - txn.exec_params0("insert into SystemStatus values ('queue-runner', $1)", out.str()); + txn.exec_params0("insert into SystemStatus values ('queue-runner', $1)", statusJson.dump()); txn.exec("notify status_dumped"); txn.commit(); }