From 1405be8aca81e55ee20d0d3f9c7c48c3af749999 Mon Sep 17 00:00:00 2001 From: K900 Date: Mon, 30 Sep 2024 14:36:15 +0300 Subject: [PATCH] [WIP] Hydra metrics --- .../exporters/hydra/hydra-exporter.py | 360 ++++++++++++++++++ 1 file changed, 360 insertions(+) create mode 100755 services/monitoring/exporters/hydra/hydra-exporter.py diff --git a/services/monitoring/exporters/hydra/hydra-exporter.py b/services/monitoring/exporters/hydra/hydra-exporter.py new file mode 100755 index 0000000..1e7550b --- /dev/null +++ b/services/monitoring/exporters/hydra/hydra-exporter.py @@ -0,0 +1,360 @@ +#!/usr/bin/env nix-shell +#!nix-shell -i python3 -p "python3.withPackages(ps: [ps.aioprometheus ps.click ps.httpx ps.starlette ps.uvicorn])" +import asyncio +from contextlib import asynccontextmanager +import logging +from aioprometheus import Counter, Gauge +from aioprometheus.asgi.starlette import metrics +import click +import httpx +from starlette.applications import Starlette +from starlette.routing import Route +import uvicorn + + +up = Gauge("hydra_up", "Is Hydra running") +time = Gauge("hydra_time", "Hydra's current time") +uptime = Gauge("hydra_uptime", "Hydra's uptime") + +builds_queued = Gauge("hydra_builds_queued", "Number of jobs in build queue") +steps_active = Gauge("hydra_steps_active", "Number of active steps in build queue") +steps_building = Gauge("hydra_steps_building", "Number of steps currently building") +steps_copying_to = Gauge( + "hydra_steps_copying_to", "Number of steps copying inputs to a worker" +) +steps_copying_from = Gauge( + "hydra_steps_copying_from", "Number of steps copying outputs from a worker" +) +steps_waiting = Gauge( + "hydra_steps_waiting", "Number of steps currently waiting for a worker slot" +) +steps_unsupported = Gauge( + "hydra_steps_unsupported", "Number of unsupported steps in build queue" +) + +bytes_sent = Counter( + "hydra_build_inputs_sent_bytes_total", + "Total number of bytes copied to workers as build inputs", +) +bytes_received = Counter( + "hydra_build_outputs_received_bytes_total", + "Total number of bytes copied from workers as build outputs", +) + +builds_read = Counter( + "hydra_builds_read_total", + "Total number of builds whose outputs have been copied from workers", +) +builds_read_seconds = Counter( + "hydra_builds_read_seconds_total", + "Total time spent copying build outputs, in seconds", +) + +builds_done = Counter("hydra_builds_done_total", "Total number of builds completed") +steps_started = Counter("hydra_steps_started_total", "Total number of steps started") +steps_done = Counter("hydra_steps_done_total", "Total number of steps completed") + +retries = Counter("hydra_retries_total", "Total number of retries") +max_retries = Gauge( + "hydra_max_retries", "Maximum observed number of retries for a single step" +) + +queue_wakeups = Counter( + "hydra_queue_wakeup_total", + "Count of the times the queue runner has been notified of queue changes", +) +dispatcher_wakeups = Counter( + "hydra_dispatcher_wakeup_total", + "Count of the times the queue runner work dispatcher woke up due to new runnable builds and completed builds.", +) + +dispatch_time = Counter( + "hydra_dispatch_execution_seconds_total", + "Total time the dispatcher has spent working, in seconds", +) + +db_connections = Gauge("hydra_db_connections", "Number of connections to the database") +active_db_updates = Gauge("hydra_db_updates", "Number of in-progress database updates") + +steps_queued = Gauge("hydra_steps_queued", "Number of steps in build queue") +steps_runnable = Gauge( + "hydra_steps_runnable", "Number of runnable steps in build queue" +) + +step_time = Counter( + "hydra_step_time_total", "Total time spent executing steps, in seconds" +) +step_build_time = Counter( + "hydra_step_build_time_total", "Total time spent executing build steps, in seconds" +) + +machine_enabled = Gauge("hydra_machine_enabled", "Whether machine is enabled") +machine_steps_done = Counter( + "hydra_machine_steps_done_total", "Total number of steps completed by this worker" +) +machine_current_jobs = Gauge( + "hydra_machine_current_jobs", "Number of jobs currently running on this worker" +) +machine_disabled_until = Gauge( + "hydra_machine_disabled_until", + "Timestamp of when this worker will next become active", +) +machine_last_failure = Gauge( + "hydra_machine_last_failure", "Timestamp of when a build last failed on this worker" +) +machine_consecutive_failures = Gauge( + "hydra_machine_consecutive_failures", + "Number of consecutive failed builds on this worker", +) + +machine_idle_since = Gauge( + "hydra_machine_idle_since", "Timestamp of when this worker last had jobs running" +) +machine_step_time = Counter( + "hydra_machine_step_time_total", + "Total time this worker spent executing steps, in seconds", +) +machine_step_build_time = Counter( + "hydra_machine_step_build_time_total", + "Total time this worker spent executing build steps, in seconds", +) + +jobset_time = Counter( + "hydra_jobset_seconds_total", + "Total time this jobset has been building for, in seconds", +) +jobset_shares_used = Gauge( + "hydra_jobset_shares_used", "Number of shares currently consumed by this jobset" +) + +machine_type_runnable = Gauge( + "hydra_machine_type_runnable", + "Number of steps currently runnable on this machine type", +) +machine_type_running = Gauge( + "hydra_machine_type_running", + "Number of steps currently running on this machine type", +) +machine_type_wait_time = Counter( + "hydra_machine_type_wait_time_total", + "Total time spent waiting for a build slot of this machine type", +) +machine_type_last_active = Gauge( + "hydra_machine_type_last_active", + "Timestamp of when a machine of this type was last active", +) + +store_nar_info_read = Counter( + "hydra_store_nar_info_read_total", + "Total number of narinfo files read from the remote store", +) +store_nar_info_read_averted = Counter( + "hydra_store_nar_info_read_averted_total", + "Total number of narinfo file reads averted (already loaded)", +) +store_nar_info_missing = Counter( + "hydra_store_nar_info_missing_total", + "Total number of narinfo files found to be missing", +) +store_nar_info_write = Counter( + "hydra_store_nar_info_write_total", + "Total number of narinfo files written to the remote store", +) +store_nar_info_cache_size = Gauge( + "hydra_store_nar_info_cache_size", + "Size of the in-memory store path information cache", +) +store_nar_read = Counter( + "hydra_store_nar_read_total", "Total number of NAR files read from the remote store" +) +store_nar_read_bytes = Counter( + "hydra_store_nar_read_bytes_total", + "Total number of NAR file bytes read from the remote store (uncompressed)", +) +store_nar_read_compressed_bytes = Counter( + "hydra_store_nar_read_compressed_bytes_total", + "Total number of NAR file bytes read from the remote store (compressed)", +) +store_nar_write = Counter( + "hydra_store_nar_write_total", + "Total number of NAR files written to the remote store", +) +store_nar_write_averted = Counter( + "hydra_store_nar_write_averted_total", + "Total number of NAR file writes averted (already exists on remote)", +) +store_nar_write_bytes = Counter( + "hydra_store_nar_write_bytes_total", + "Total number of NAR file bytes written to the remote store (uncompressed)", +) +store_nar_write_compressed_bytes = Counter( + "hydra_store_nar_write_compressed_bytes_total", + "Total number of NAR file bytes written to the remote store (compressed)", +) +store_nar_write_compression_seconds = Counter( + "hydra_store_nar_write_compression_seconds_total", + "Total time spent compressing NAR files for writing to the remote store", +) + +store_s3_put = Counter( + "hydra_store_s3_put_total", "Total number of PUT requests to S3 store" +) +store_s3_put_bytes = Counter( + "hydra_store_s3_put_bytes_total", "Total number of bytes written to S3 store" +) +store_s3_put_seconds = Counter( + "hydra_store_s3_put_seconds_total", + "Total time spent writing to S3 store, in seconds", +) +store_s3_get = Counter( + "hydra_store_s3_get_total", "Total number of GET requests to S3 store" +) +store_s3_get_bytes = Counter( + "hydra_store_s3_get_bytes_total", "Total number of bytes read from S3 store" +) +store_s3_get_seconds = Counter( + "hydra_store_s3_get_seconds_total", + "Total time spent reading from S3 store, in seconds", +) +store_s3_head = Counter( + "hydra_store_s3_head_total", "Total number of HEAD requests to S3 store" +) + + +def update_metrics(status): + up.set({}, int(status["status"] == "up")) + time.set({}, status["time"]) + uptime.set({}, status["uptime"]) + + builds_queued.set({}, status["nrQueuedBuilds"]) + steps_active.set({}, status["nrActiveSteps"]) + steps_building.set({}, status["nrStepsBuilding"]) + steps_copying_to.set({}, status["nrStepsCopyingTo"]) + steps_copying_from.set({}, status["nrStepsCopyingFrom"]) + steps_waiting.set({}, status["nrStepsWaiting"]) + steps_unsupported.set({}, status["nrUnsupportedSteps"]) + + bytes_sent.set({}, status["bytesSent"]) + bytes_received.set({}, status["bytesReceived"]) + + builds_read.set({}, status["nrBuildsRead"]) + builds_read_seconds.set({}, status["buildReadTimeMs"] / 1000) + + builds_done.set({}, status["nrBuildsDone"]) + steps_started.set({}, status["nrStepsStarted"]) + steps_done.set({}, status["nrStepsDone"]) + + retries.set({}, status["nrRetries"]) + max_retries.set({}, status["maxNrRetries"]) + + queue_wakeups.set({}, status["nrQueueWakeups"]) + dispatcher_wakeups.set({}, status["nrDispatcherWakeups"]) + dispatch_time.set({}, status["dispatchTimeMs"] / 1000) + + db_connections.set({}, status["nrDbConnections"]) + active_db_updates.set({}, status["nrActiveDbUpdates"]) + + steps_queued.set({}, status["nrUnfinishedSteps"]) + steps_runnable.set({}, status["nrRunnableSteps"]) + + if st := status.get("totalStepTime"): + step_time.set({}, st) + + if sbt := status.get("totalStepBuildTime"): + step_build_time.set({}, sbt) + + for machine_name, machine_status in status["machines"].items(): + labels = {"host": machine_name} + machine_enabled.set(labels, int(machine_status["enabled"])) + machine_steps_done.set(labels, machine_status["nrStepsDone"]) + machine_current_jobs.set(labels, machine_status["currentJobs"]) + machine_disabled_until.set(labels, machine_status["disabledUntil"]) + machine_last_failure.set(labels, machine_status["lastFailure"]) + machine_consecutive_failures.set(labels, machine_status["consecutiveFailures"]) + + if isn := machine_status.get("idleSince"): + machine_idle_since.set(labels, isn) + + if st := machine_status.get("totalStepTime"): + machine_step_time.set(labels, st) + + if sbt := machine_status.get("totalStepBuildTime"): + machine_step_build_time.set(labels, sbt) + + for jobset_name, jobset_status in status["jobsets"].items(): + labels = {"name": jobset_name} + jobset_time.set(labels, jobset_status["seconds"]) + jobset_shares_used.set(labels, jobset_status["shareUsed"]) + + for type_name, type_status in status["machineTypes"].items(): + labels = {"machineType": type_name} + machine_type_runnable.set(labels, type_status["runnable"]) + machine_type_running.set(labels, type_status["running"]) + + if wt := type_status.get("waitTime"): + machine_type_wait_time.set(labels, wt) + + if la := type_status.get("lastActive"): + machine_type_last_active.set(labels, la) + + store = status["store"] + store_nar_info_read.set({}, store["narInfoRead"]) + store_nar_info_read_averted.set({}, store["narInfoReadAverted"]) + store_nar_info_missing.set({}, store["narInfoMissing"]) + store_nar_info_write.set({}, store["narInfoWrite"]) + store_nar_info_cache_size.set({}, store["narInfoCacheSize"]) + store_nar_read.set({}, store["narRead"]) + store_nar_read_bytes.set({}, store["narReadBytes"]) + store_nar_read_compressed_bytes.set({}, store["narReadCompressedBytes"]) + store_nar_write.set({}, store["narWrite"]) + store_nar_write_averted.set({}, store["narWriteAverted"]) + store_nar_write_bytes.set({}, store["narWriteBytes"]) + store_nar_write_compressed_bytes.set({}, store["narWriteCompressedBytes"]) + store_nar_write_compression_seconds.set( + {}, store["narWriteCompressionTimeMs"] / 1000 + ) + + if s3 := status.get("s3"): + store_s3_put.set({}, s3["put"]) + store_s3_put_bytes.set({}, s3["putBytes"]) + store_s3_put_seconds.set({}, s3["putTimeMs"] / 1000) + store_s3_get.set({}, s3["get"]) + store_s3_get_bytes.set({}, s3["getBytes"]) + store_s3_get_seconds.set({}, s3["getTimeMs"] / 1000) + store_s3_head.set({}, s3["head"]) + + +async def update_metrics_loop(hydra_url, scrape_interval): + async with httpx.AsyncClient(base_url=hydra_url) as client: + while True: + try: + response = await client.get( + "/queue-runner-status", + headers={"Content-Type": "application/json"}, + ) + + update_metrics(response.json()) + + await asyncio.sleep(scrape_interval) + except Exception as ex: + logging.exception("Failed to update metrics", exc_info=ex) + + +@click.command() +@click.option("--hydra-url", default="https://hydra.forkos.org/") +@click.option("--port", default=9200) +@click.option("--scrape-interval", default=15) +def main(hydra_url, port, scrape_interval): + @asynccontextmanager + async def lifespan(_): + loop = asyncio.get_event_loop() + loop.create_task(update_metrics_loop(hydra_url, scrape_interval)) + yield + + app = Starlette(routes=[Route("/metrics", metrics)], lifespan=lifespan) + + uvicorn.run(app, port=port, log_level="info") + + +if __name__ == "__main__": + main()